### Library imports

In [1]:
import json
import re
from typing import Any, Dict, List

from datasets import load_dataset

from rapidfireai.infer.experiment import Experiment

INFO 10-29 08:00:17 __init__.py:190] Automatically detected platform cuda.


### Dataset

In [None]:
dataset = load_dataset("openai/gsm8k", "main", split="train").select(range(512))
print(f"Loaded {len(dataset)} training samples")

Loaded 2048 test samples


### Inference Pipeline Config using `VLLMModelConfig`

##### We will now build a fast inference pipeline using vLLM with a full suite of context engineering capabilities including retrieval augmented generation (RAG), in-context learning with fewshot examples, with evaluation metrics powered by online aggregation.

You will be able to experiment with and tune configurations:
- Document splitting and chunking
- Embeddings
- Document and vector stores
- Retrieval techniques
- Reranking techniques
- Prompt engineering and in-context learning
- Dynamically selecting fewshot examples
- Model generation settings
- Post-processing
- Evaluation metrics


##### We will use locally hosted models for both embedding and generation

### RAG Implementation using `LangChainRagSpec`

In [None]:
from langchain_community.document_loaders import DirectoryLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter

from rapidfireai.infer.rag.rag_pipeline import LangChainRagSpec

batch_size = 32  # Reduced from 128 for T4

# Shared document loader and text splitter
document_loader = DirectoryLoader(
    path="../data/gsm8k",
    glob="*.txt",
    recursive=True,
    sample_seed=1337
)
text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
    encoding_name="gpt2", chunk_size=128, chunk_overlap=32
)

# GPU-based RAG - Configuration 1 (retrieve 2 documents - reduced from 3)
rag_gpu_k2 = LangChainRagSpec(
    document_loader=document_loader,
    text_splitter=text_splitter,
    embedding_cls=HuggingFaceEmbeddings,
    embedding_kwargs={
        'model_name': "sentence-transformers/all-MiniLM-L6-v2",
        'encode_kwargs': {'normalize_embeddings': True, 'batch_size': batch_size}
    },
    retriever=None,
    vector_store=None,
    search_type="similarity",
    search_kwargs={"k": 2},  # Retrieve 2 documents
    reranker=None,
    enable_gpu_search=True
)

# GPU-based RAG - Configuration 2 (retrieve 3 documents)
rag_gpu_k3 = LangChainRagSpec(
    document_loader=document_loader,
    text_splitter=text_splitter,
    embedding_cls=HuggingFaceEmbeddings,
    embedding_kwargs={
        'model_name': "sentence-transformers/all-MiniLM-L6-v2",
        'encode_kwargs': {'normalize_embeddings': True, 'batch_size': batch_size}
    },
    retriever=None,
    vector_store=None,
    search_type="similarity",
    search_kwargs={"k": 3},  # Retrieve 3 documents
    reranker=None,
    enable_gpu_search=True
)

### Few Shot Prompt Manager using `PromptManager`

In [None]:
from langchain_community.vectorstores import FAISS
from langchain_core.example_selectors import SemanticSimilarityExampleSelector
from langchain_core.prompts import PromptTemplate

from rapidfireai.infer.rag.prompt_manager import PromptManager

INSTRUCTIONS = "You are a helpful assistant that is good at solving math problems. You think step by step and ALWAYS output the final answer after '####'."

# Keep same examples but reduce k values

# Prompt Manager Configuration 1 (1 fewshot example - reduced from 2)
fewshot_prompt_manager_k1 = PromptManager(
    instructions=INSTRUCTIONS,
    examples=examples,  # Same examples as before
    embedding_cls=HuggingFaceEmbeddings,
    embedding_kwargs={
        'model_name': "sentence-transformers/all-MiniLM-L6-v2",
        'encode_kwargs': {'normalize_embeddings': True, 'batch_size': batch_size}
    },
    example_selector_cls=SemanticSimilarityExampleSelector,
    example_prompt_template=PromptTemplate(
        input_variables=["question", "answer"],
        template="Question: {question}\nAnswer: {answer}",
    ),
    k=1,  # 1 fewshot example
)

# Prompt Manager Configuration 2 (2 fewshot examples - reduced from 3)
fewshot_prompt_manager_k2 = PromptManager(
    instructions=INSTRUCTIONS,
    examples=examples,
    embedding_cls=HuggingFaceEmbeddings,
    embedding_kwargs={
        'model_name': "sentence-transformers/all-MiniLM-L6-v2",
        'encode_kwargs': {'normalize_embeddings': True, 'batch_size': batch_size}
    },
    example_selector_cls=SemanticSimilarityExampleSelector,
    example_prompt_template=PromptTemplate(
        input_variables=["question", "answer"],
        template="Question: {question}\nAnswer: {answer}",
    ),
    k=2,  # 2 fewshot examples
)

### Context engineering using `ContextGenerator` = RAG + Prompt Manager

In [None]:
from rapidfireai.infer.rag.context_generator import ContextGenerator

# Context Generator 1: k=2 RAG docs + k=1 fewshot example (lightweight)
context_generator_1 = ContextGenerator(
    rag_spec=rag_gpu_k2,
    prompt_manager=fewshot_prompt_manager_k1
)

# Context Generator 2: k=3 RAG docs + k=2 fewshot examples (richer context)
context_generator_2 = ContextGenerator(
    rag_spec=rag_gpu_k3,
    prompt_manager=fewshot_prompt_manager_k2
)

### Model config using `VLLMModelConfig`

In [None]:
from rapidfireai.infer.utils.config import VLLMModelConfig

# Pipeline 1: Qwen 0.5B + Context 1 (ultra-light, fits easily in T4)
pipeline_1 = VLLMModelConfig(
    model_config={
        "model": "Qwen/Qwen2.5-0.5B-Instruct",  # 0.5B params
        "dtype": "half",
        "gpu_memory_utilization": 0.5,  # Conservative for T4
        "tensor_parallel_size": 1,
        "distributed_executor_backend": "mp",
        "enable_chunked_prefill": True,
        "enable_prefix_caching": True,
        "max_model_len": 1536,  # Reduced from 2048
        "disable_log_stats": True,
    },
    sampling_params={
        "temperature": 0.3,
        "top_p": 0.9,
        "max_tokens": 384,  # Reduced from 512
    },
    context_generator=context_generator_1
)

# Pipeline 2: Qwen 1.5B + Context 2 (still fits in T4 with room to spare)
pipeline_2 = VLLMModelConfig(
    model_config={
        "model": "Qwen/Qwen2.5-1.5B-Instruct",  # 1.5B params
        "dtype": "half",
        "gpu_memory_utilization": 0.5,
        "tensor_parallel_size": 1,
        "distributed_executor_backend": "mp",
        "enable_chunked_prefill": True,
        "enable_prefix_caching": True,
        "max_model_len": 1536,
        "disable_log_stats": True,
    },
    sampling_params={
        "temperature": 0.7,
        "top_p": 0.95,
        "max_tokens": 384,
    },
    context_generator=context_generator_2
)

# Create list of (name, config) tuples - ONLY 2 pipelines
pipelines = [
    ("0.5B_LightContext", pipeline_1),
    ("1.5B_RichContext", pipeline_2),
]

print(f"Configured {len(pipelines)} pipelines (T4-optimized):")
print(f"  - Pipeline 1: Qwen 0.5B + light context (2 RAG docs, 1 fewshot)")
print(f"  - Pipeline 2: Qwen 1.5B + rich context (3 RAG docs, 2 fewshot)")

Configured 3 pipelines:
  - Pipeline 1 & 2: Same context (3 RAG docs, 2 fewshot), different models (3B vs 1.5B)
  - Pipeline 1 & 3: Same model (3B), different contexts (Context1 vs Context2)


### Utility, Preprocessor, Postprocessor, Compute Metrics

In [7]:
def extract_solution(answer):
    solution = re.search("#### (\\-?[0-9\\.\\,]+)", answer)
    if solution is None:
        return "0"
    final_solution = solution.group(0)
    final_solution = final_solution.split("#### ")[1].replace(",", "")
    return final_solution

def preprocess_fn(batch: Dict[str, List], context_generator: ContextGenerator) -> Dict[str, List]:

    INSTRUCTIONS = context_generator.get_instructions()

    return {
        "prompts": [
            [
                {
                    "role": "system",
                    "content": INSTRUCTIONS
                },
                {
                    "role": "user",
                    "content": f'Here are some examples: \n{examples}. \nHere is some additional context:\n{context}. \nNow answer the following question:\n{question}'
                }
            ]
            for question, examples,context in zip(
                batch["question"],
                context_generator.get_fewshot_examples(user_queries=batch["question"]),
                context_generator.get_context(batch_queries=batch["question"])
            )
        ],
        **batch,
    }

def postprocess_fn(batch: Dict[str, List]) -> Dict[str, List]:
    batch["model_answer"] = [extract_solution(answer) for answer in batch["generated_text"]]
    batch["ground_truth"] = [extract_solution(answer) for answer in batch["answer"]]
    return batch

def compute_metrics_fn(batch: Dict[str, List]) -> Dict[str, Dict[str, Any]]:
    correct = sum(1 for pred, gt in zip(batch["model_answer"], batch["ground_truth"])
                  if pred == gt)
    total = len(batch["model_answer"])
    return {
        "Correct": {"value": correct},
        "Total": {"value": total},
    }

def accumulate_metrics_fn(aggregated_metrics: Dict[str, List]) -> Dict[str, Dict[str, Any]]:
    # aggregated_metrics is a dict of lists: {"Correct": [5, 3, 7], "Total": [10, 8, 12]}
    correct = sum(m.get("value", 0) for m in aggregated_metrics.get("Correct", [{}]))
    total = sum(m.get("value", 0) for m in aggregated_metrics.get("Total", [{}]))
    accuracy = correct / total if total > 0 else 0
    return {
        "Total": {"value": total},
        "Correct": {"value": correct, "is_distributive": True, "value_range": (0, 1)}, # 0 (min) if not correct, 1 if correct (max)
        "Accuracy": {"value": accuracy, "is_algebraic": True, "value_range": (0, 1)} # Algebraic metric for online aggregation
    }

### Create Experiment

In [None]:
# T4 is a single GPU, so use num_actors=1
experiment = Experiment(
    experiment_name="trial-context-generator-colab-t4",
    num_actors=1  # Changed from 2
)

2025-10-29 08:00:21,051	INFO worker.py:2004 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


 * Serving Flask app 'rapidfireai.infer.dispatcher.dispatcher'
 * Debug mode: off


Loading safetensors checkpoint shards:   0% Completed | 0/2 [00:00<?, ?it/s]
Loading safetensors checkpoint shards:  50% Completed | 1/2 [00:01<00:01,  1.09s/it]
Loading safetensors checkpoint shards: 100% Completed | 2/2 [00:01<00:00,  1.09it/s]
Loading safetensors checkpoint shards: 100% Completed | 2/2 [00:01<00:00,  1.06it/s]
[36m(QueryProcessingActor pid=1459581)[0m 
Capturing CUDA graph shapes:   0%|          | 0/35 [00:00<?, ?it/s]
Capturing CUDA graph shapes:   3%|▎         | 1/35 [00:00<00:17,  1.97it/s]
Capturing CUDA graph shapes:   6%|▌         | 2/35 [00:00<00:15,  2.07it/s]
Capturing CUDA graph shapes:   9%|▊         | 3/35 [00:01<00:15,  2.05it/s]
Capturing CUDA graph shapes:  11%|█▏        | 4/35 [00:01<00:14,  2.11it/s]
Capturing CUDA graph shapes:  14%|█▍        | 5/35 [00:02<00:14,  2.13it/s]
Capturing CUDA graph shapes:  17%|█▋        | 6/35 [00:02<00:13,  2.15it/s]
Capturing CUDA graph shapes:  20%|██        | 7/35 [00:03<00:12,  2.16it/s]
Capturing CUDA graph sh

### Run Experiment

In [None]:
results_by_run = experiment.run_evals(
    configs=pipelines,
    dataset=dataset,
    batch_size=batch_size,  # 32
    num_shards=4,
    preprocess_fn=preprocess_fn,
    postprocess_fn=postprocess_fn,
    compute_metrics_fn=compute_metrics_fn,
    accumulate_metrics_fn=accumulate_metrics_fn,
    online_strategy_kwargs={"strategy_name": "normal", "confidence_level": 0.95, "use_fpc": True}
)

=== Context Generation ===


Context ID,Context Hash,Status,Duration,Details
2,0867fad4...,Complete,38.8s,"FAISS, GPU"
3,1ef40f9a...,Complete,39.9s,"FAISS, GPU"



=== Multi-Pipeline Experiment Progress ===


Pipeline ID,Name,Model,Progress,Confidence,Accuracy,Throughput
102,3B_Context1,Qwen/Qwen2.5-3B-Instruct,3/4,0.013,49.22%,3.5/s
103,1.5B_Context1,Qwen/Qwen2.5-1.5B-Instruct,3/4,0.011,28.26%,3.6/s
104,3B_Context2,Qwen/Qwen2.5-3B-Instruct,0/4,-,-,-


### End Experiment

In [None]:
experiment.end()

### View Results

In [None]:
print(f"\n{'='*80}")
print("RESULTS FOR ALL PIPELINES")
print(f"{'='*80}\n")

for pipeline_id, (aggregated_results, metrics) in results_by_pipeline.items():
    # Get pipeline name from the original configs
    pipeline_name = [name for name, _ in pipelines][pipeline_id - 1] if pipeline_id <= len(pipelines) else f"Pipeline {pipeline_id}"

    print(f"\n{'-'*80}")
    print(f"Pipeline: {pipeline_name.upper()} (ID: {pipeline_id})")
    print(f"{'-'*80}")
    print(json.dumps(metrics, indent=4))
    print()

In [None]:
print(f"\n{'='*80}")
print("SAMPLE OUTPUTS FROM EACH PIPELINE")
print(f"{'='*80}\n")

num_examples_to_show = 2  # Show 2 examples per pipeline

for pipeline_id, (aggregated_results, metrics) in results_by_pipeline.items():
    # Get pipeline name from the original configs
    pipeline_name = [name for name, _ in pipelines][pipeline_id - 1] if pipeline_id <= len(pipelines) else f"Pipeline {pipeline_id}"

    print(f"\n{'-'*80}")
    print(f"Pipeline: {pipeline_name.upper()} (ID: {pipeline_id})")
    print(f"{'-'*80}\n")

    samples_available = min(num_examples_to_show, metrics['Samples Processed']['value'])

    for i in range(samples_available):
        print(f"\n{'~'*40}")
        print(f"Example {i+1}/{samples_available}")
        print(f"{'~'*40}")

        prompt = aggregated_results['prompts'][i]
        print(f"\nSystem Instructions:")
        print(f"{prompt[0]['content'][:200]}...")  # Truncate long instructions

        print(f"\nUser Query + Context:")
        print(f"{prompt[1]['content'][:300]}...")  # Truncate long context

        print(f"\nModel Output:")
        print(f"{aggregated_results['generated_text'][i]}")

        print(f"\nGround Truth:")
        print(f"{aggregated_results['ground_truth'][i]}")

        print(f"\nExtracted Answer:")
        print(f"Model: {aggregated_results['model_answer'][i]} | Truth: {aggregated_results['ground_truth'][i]}")

        is_correct = aggregated_results['model_answer'][i] == aggregated_results['ground_truth'][i]
        print(f"✓ CORRECT" if is_correct else "✗ INCORRECT")

    print(f"\n{'-'*80}\n")
