In [1]:
!pip install -q datasets sentence-transformers faiss-cpu transformers accelerate


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m50.5 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import os
from dataclasses import dataclass
from typing import List, Tuple

import faiss
import numpy as np
import pandas as pd
import torch
from datasets import load_dataset
from sentence_transformers import SentenceTransformer
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
)

print("✅ Libraries imported.")
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())


✅ Libraries imported.
PyTorch version: 2.8.0+cu126
CUDA available: False


In [3]:
# Load the HumanEval dataset from Hugging Face
dataset = load_dataset("openai/openai_humaneval", split="test")
print(dataset)

# Extract only the required fields
df = dataset.to_pandas()[["task_id", "prompt", "canonical_solution"]].copy()
print("Number of tasks:", len(df))

# Show a small sample
df.head()


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

openai_humaneval/test-00000-of-00001.par(…):   0%|          | 0.00/83.9k [00:00<?, ?B/s]

Generating test split:   0%|          | 0/164 [00:00<?, ? examples/s]

Dataset({
    features: ['task_id', 'prompt', 'canonical_solution', 'test', 'entry_point'],
    num_rows: 164
})
Number of tasks: 164


Unnamed: 0,task_id,prompt,canonical_solution
0,HumanEval/0,from typing import List\n\n\ndef has_close_ele...,"for idx, elem in enumerate(numbers):\n ..."
1,HumanEval/1,from typing import List\n\n\ndef separate_pare...,result = []\n current_string = []\n ...
2,HumanEval/2,\n\ndef truncate_number(number: float) -> floa...,return number % 1.0\n
3,HumanEval/3,from typing import List\n\n\ndef below_zero(op...,balance = 0\n\n for op in operations:\n...
4,HumanEval/4,from typing import List\n\n\ndef mean_absolute...,mean = sum(numbers) / len(numbers)\n re...


In [4]:
# Sentence-transformers model for prompt embeddings
EMBED_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"

embed_model = SentenceTransformer(EMBED_MODEL_NAME)
print("✅ Loaded embedding model:", EMBED_MODEL_NAME)

# Encode the HumanEval prompts (task descriptions inside the docstring)
prompt_texts = df["prompt"].tolist()

embeddings = embed_model.encode(
    prompt_texts,
    convert_to_numpy=True,
    show_progress_bar=True,
    normalize_embeddings=True,  # so we can use inner product as cosine
)

embeddings = embeddings.astype("float32")
print("Embeddings shape:", embeddings.shape)

# Build FAISS index (inner-product / cosine similarity)
dim = embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(embeddings)
print("FAISS index size:", index.ntotal)


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

✅ Loaded embedding model: sentence-transformers/all-MiniLM-L6-v2


Batches:   0%|          | 0/6 [00:00<?, ?it/s]

Embeddings shape: (164, 384)
FAISS index size: 164


In [5]:
@dataclass
class RetrievedExample:
    task_id: str
    prompt: str
    solution: str
    score: float


def retrieve_similar_tasks(
    query_text: str,
    k: int = 3,
) -> List[RetrievedExample]:
    """
    Embed user query and retrieve top-k most similar HumanEval examples.
    """
    query_emb = embed_model.encode(
        [query_text],
        convert_to_numpy=True,
        normalize_embeddings=True,
    ).astype("float32")

    scores, idxs = index.search(query_emb, k)
    idxs = idxs[0]
    scores = scores[0]

    results = []
    for i, score in zip(idxs, scores):
        row = df.iloc[i]
        results.append(
            RetrievedExample(
                task_id=row["task_id"],
                prompt=row["prompt"],
                solution=row["canonical_solution"],
                score=float(score),
            )
        )
    return results


# Quick sanity check
test_query = "Write a function that returns the nth Fibonacci number."
examples = retrieve_similar_tasks(test_query, k=3)
for ex in examples:
    print(ex.task_id, "score:", round(ex.score, 3))


HumanEval/55 score: 0.749
HumanEval/46 score: 0.656
HumanEval/63 score: 0.645


In [6]:
# Small-ish example model; feel free to switch to a bigger one if you have GPU
# Examples you can try:
#   "bigcode/starcoder2-3b"
#   "bigcode/starcoder2-7b"
#   "codellama/CodeLlama-7b-Instruct-hf"
CODE_MODEL_NAME = "bigcode/starcoder2-3b"

device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.float16 if device == "cuda" else torch.float32

tokenizer = AutoTokenizer.from_pretrained(CODE_MODEL_NAME)
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id

model = AutoModelForCausalLM.from_pretrained(
    CODE_MODEL_NAME,
    torch_dtype=dtype,
    device_map="auto",   # uses GPU if available
)

print("✅ Loaded code model:", CODE_MODEL_NAME)
print("Model device:", model.device)


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/958 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/700 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/12.1G [00:00<?, ?B/s]



✅ Loaded code model: bigcode/starcoder2-3b
Model device: cpu


In [10]:
def build_rag_prompt(
    user_task_description: str,
    retrieved: List[RetrievedExample],
) -> str:
    """
    Construct a system prompt for the code LLM that includes:
    - The new task description
    - A few similar HumanEval examples (prompt + solution)
    """
    examples_text = []
    for i, ex in enumerate(retrieved, start=1):
        # Truncate long prompts/solutions if needed
        prompt_snippet = ex.prompt
        solution_snippet = ex.solution

        # Optional truncation to avoid context limits
        max_chars = 1600
        if len(prompt_snippet) > max_chars:
            prompt_snippet = prompt_snippet[:max_chars] + "\n# ... truncated ..."
        if len(solution_snippet) > max_chars:
            solution_snippet = solution_snippet[:max_chars] + "\n# ... truncated ..."

        examples_text.append(
            f"""
### Example {i} — task_id: {ex.task_id}
# Prompt
{prompt_snippet}

# Canonical solution
```python
{solution_snippet}
```

"""
        )

    examples_block = "\n".join(examples_text)

    final_prompt = f"""
You are an expert Python programmer. You will be given a new programming task
description, plus a few similar solved examples from the HumanEval dataset.
Use the examples as context and style guidance, but you MUST write a fresh
solution only for the new task.

* Write **one complete Python function** that solves the new task.
* Include the correct function signature.
* Use clear variable names and minimal comments.
* Do NOT include tests, explanations, or extra text outside the code.
* Output only valid Python code.

## NEW TASK DESCRIPTION

{user_task_description}

## SIMILAR HUMAN-EVAL EXAMPLES

{examples_block}

Now write the Python function that solves the NEW TASK.
Remember: output ONLY code.
"""
    return final_prompt.strip()


In [8]:

### 🧩 Cell 9 – Full RAG pipeline: embed → retrieve → generate

def rag_generate_code(
    task_description: str,
    k: int = 3,
    max_new_tokens: int = 256,
) -> Tuple[str, List[RetrievedExample], str]:
    """
    End-to-end RAG pipeline:
      1. Embed task_description
      2. Retrieve k similar HumanEval examples
      3. Build a composite prompt with examples as context
      4. Call the code LLM to generate Python code

    Returns
    -------
    generated_code : str
    retrieved_examples : list[RetrievedExample]
    full_prompt : str  (for debugging / inspection)
    """
    retrieved = retrieve_similar_tasks(task_description, k=k)
    prompt = build_rag_prompt(task_description, retrieved)
    code = generate_code_from_prompt(prompt, max_new_tokens=max_new_tokens)

    return code, retrieved, prompt

In [13]:
def generate_code_from_prompt(
    prompt: str,
    max_new_tokens: int = 256,
    temperature: float = 0.2,
    top_p: float = 0.95,
) -> str:
    """
    Generate Python code from a text prompt using the loaded code LLM.
    Includes proper attention_mask to avoid the 'pad token = eos token' warning.
    """

    # Encode input prompt
    encoded = tokenizer(
        prompt,
        return_tensors="pt",
        padding=True,            # ensures proper attention mask
        truncation=False,
    ).to(model.device)

    input_ids = encoded.input_ids
    attention_mask = encoded.attention_mask

    with torch.no_grad():
        output_ids = model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,   # FIXES YOUR WARNING
            max_new_tokens=max_new_tokens,
            do_sample=True,
            temperature=temperature,
            top_p=top_p,
            pad_token_id=tokenizer.pad_token_id,
            eos_token_id=tokenizer.eos_token_id,
        )

    # Extract ONLY the newly generated portion
    generated_part = output_ids[0][input_ids.shape[1]:]

    return tokenizer.decode(generated_part, skip_special_tokens=True)


In [None]:
# Natural-language task description
user_task = """
Write a function sum_even_numbers(nums) that takes a list of integers
and returns the sum of the even numbers. If there are no even numbers,
return 0.
"""

# Run full RAG pipeline
generated_code, retrieved_examples, used_prompt = rag_generate_code(
    task_description=user_task,
    k=3,
    max_new_tokens=256,
)

# Display retrieved examples
print("===== Retrieved examples (task_id, similarity score) =====")
for ex in retrieved_examples:
    print(f"{ex.task_id} | score={ex.score:.3f}")

# Display generated code
print("\n===== Generated code =====\n")
print(generated_code)


In [None]:
# Only for debugging / understanding what we send to the LLM
print("===== Full RAG prompt (first 4000 chars) =====")
print(used_prompt[:4000])  # print first 4000 chars