In [1]:
from google.colab import drive
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
!pip install -q --force-reinstall numpy==1.26.4
!pip install -q --force-reinstall \
    faiss-cpu==1.8.0.post1 \
    sentence-transformers==2.7.0 \
    transformers==4.41.2 \
    accelerate==0.30.1 \
    bitsandbytes==0.43.1 \
    pandas \
    pynvml

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
rasterstats 0.20.0 requires rasterio>=1.0, which is not installed.
pymc 5.27.1 requires pytensor<2.38.0,>=2.37.0, which is not installed.
albucore 0.0.24 requires opencv-python-headless>=4.9.0.80, which is not installed.
optax 0.2.7 requires jax>=0.5.3, which is not installed.
optax 0.2.7 requires jaxlib>=0.5.3, which is not installed.
albumentations 2.0.8 requires opencv-python-headless>=4.9.0.80, which is not installed.
orbax-checkpoint 0.11.32 requires jax>=0.6.0, which is not installed.
dopamine-rl 4.1.2 requires jax>=0.1.72, which is not installed.
dopamine-rl 4.1.2 requires jaxlib>=0.1.51, which is not installed.
dopamine-rl 4.1.2 requires opencv-python>=3.4.8.29, which is not installed.
pysal 25.7 requires tobler>=0.12.1, which is not installed.
flax 0.11.2 requires jax>=0.6.0, which is not installed.

In [3]:
import os, re, json, time, math, threading, statistics
from pathlib import Path

import numpy as np
import pandas as pd
import faiss

import torch
from sentence_transformers import SentenceTransformer, CrossEncoder
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline

from pynvml import (
    nvmlInit, nvmlShutdown, nvmlDeviceGetHandleByIndex,
    nvmlDeviceGetPowerUsage, nvmlDeviceGetName
)

print("NumPy:", np.__version__)
print("Torch:", torch.__version__, "CUDA:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))

  import pynvml  # type: ignore[import]


NumPy: 1.26.4
Torch: 2.10.0+cu128 CUDA: True
GPU: Tesla T4


In [4]:
BASE_DIR = Path("/content/drive/MyDrive/PerfWattLab_RAG")
INDEX_DIR = BASE_DIR / "index"
OUT_DIR = BASE_DIR / "outputs"
SCRIPT_DIR = BASE_DIR / "scripts"

OUT_DIR.mkdir(parents=True, exist_ok=True)
SCRIPT_DIR.mkdir(parents=True, exist_ok=True)

index_path = INDEX_DIR / "faiss.index"
chunks_path = INDEX_DIR / "chunks.json"

assert index_path.exists(), f"Missing {index_path}"
assert chunks_path.exists(), f"Missing {chunks_path}"

index = faiss.read_index(str(index_path))
with open(chunks_path, "r") as f:
    chunks = json.load(f)

print("Loaded FAISS ntotal:", index.ntotal)
print("Loaded chunks:", len(chunks))
print("Example chunk id:", chunks[0]["chunk_id"])

Loaded FAISS ntotal: 5
Loaded chunks: 5
Example chunk id: doc1.txt::chunk0


In [5]:
embed_model_name = "sentence-transformers/all-MiniLM-L6-v2"
reranker_name = "cross-encoder/ms-marco-MiniLM-L-6-v2"
gen_model_name = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"

embedder = SentenceTransformer(embed_model_name)
if torch.cuda.is_available():
    embedder = embedder.to("cuda")

reranker = CrossEncoder(reranker_name, device="cuda" if torch.cuda.is_available() else "cpu")

tokenizer = AutoTokenizer.from_pretrained(gen_model_name, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    gen_model_name,
    device_map="auto",
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
)
model.eval()

gen_pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)

print("Embedder:", embed_model_name)
print("Reranker:", reranker_name)
print("Generator:", gen_model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]



config.json:   0%|          | 0.00/612 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/350 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/794 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/90.9M [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/132 [00:00<?, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/608 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Embedder: sentence-transformers/all-MiniLM-L6-v2
Reranker: cross-encoder/ms-marco-MiniLM-L-6-v2
Generator: TinyLlama/TinyLlama-1.1B-Chat-v1.0


In [6]:
def retrieve(query: str, top_k: int = 10):
    t0 = time.perf_counter()
    q_emb = embedder.encode([query], convert_to_numpy=True, normalize_embeddings=True).astype(np.float32)
    scores, idxs = index.search(q_emb, top_k)
    t1 = time.perf_counter()

    results = []
    for score, i in zip(scores[0], idxs[0]):
        c = chunks[int(i)]
        results.append({
            "chunk_id": c["chunk_id"],
            "doc_id": c["doc_id"],
            "text": c["text"],
            "score": float(score),
        })
    return results, (t1 - t0) * 1000.0

def rerank(query: str, retrieved, top_k: int = 5):
    t0 = time.perf_counter()
    pairs = [(query, r["text"]) for r in retrieved]
    scores = reranker.predict(pairs)
    for r, s in zip(retrieved, scores):
        r["rerank_score"] = float(s)
    reranked = sorted(retrieved, key=lambda x: x["rerank_score"], reverse=True)[:top_k]
    t1 = time.perf_counter()
    return reranked, (t1 - t0) * 1000.0

def build_prompt(query: str, context_chunks):
    context = "\n\n".join([f"[{i+1}] {c['text']}" for i, c in enumerate(context_chunks)])
    return f"""You are a helpful assistant. Use the context to answer the question.
If the context is not enough, say you are not sure.

Context:
{context}

Question:
{query}

Answer:"""

def generate_pipeline(prompt: str, max_new_tokens: int, do_sample: bool, temperature: float, top_p: float):
    t0 = time.perf_counter()
    out = gen_pipe(
        prompt,
        max_new_tokens=max_new_tokens,
        do_sample=do_sample,
        temperature=temperature if do_sample else 0.0,
        top_p=top_p if do_sample else 1.0,
        return_full_text=False,
    )[0]["generated_text"]
    if torch.cuda.is_available():
        torch.cuda.synchronize()
    t1 = time.perf_counter()
    return out.strip(), (t1 - t0) * 1000.0

@torch.inference_mode()
def generate_direct(prompt: str, max_new_tokens: int, do_sample: bool, temperature: float, top_p: float):
    t0 = time.perf_counter()
    inputs = tokenizer(prompt, return_tensors="pt")
    if torch.cuda.is_available():
        inputs = {k: v.to("cuda") for k, v in inputs.items()}

    out_ids = model.generate(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=do_sample,
        temperature=temperature if do_sample else None,
        top_p=top_p if do_sample else None,
        use_cache=True,
    )

    if torch.cuda.is_available():
        torch.cuda.synchronize()

    t1 = time.perf_counter()
    text = tokenizer.decode(out_ids[0], skip_special_tokens=True)
    return text.strip(), (t1 - t0) * 1000.0

def rag_once(query: str, gen_mode: str, gen_cfg: dict, retrieve_k: int = 10, rerank_k: int = 5):
    retrieved, t_retr = retrieve(query, top_k=retrieve_k)
    reranked, t_rer = rerank(query, retrieved, top_k=rerank_k)
    prompt = build_prompt(query, reranked)

    if gen_mode == "pipeline":
        ans, t_gen = generate_pipeline(
            prompt,
            max_new_tokens=gen_cfg["max_new_tokens"],
            do_sample=gen_cfg["do_sample"],
            temperature=gen_cfg["temperature"],
            top_p=gen_cfg["top_p"],
        )
    elif gen_mode == "direct":
        ans, t_gen = generate_direct(
            prompt,
            max_new_tokens=gen_cfg["max_new_tokens"],
            do_sample=gen_cfg["do_sample"],
            temperature=gen_cfg["temperature"],
            top_p=gen_cfg["top_p"],
        )
    else:
        raise ValueError("gen_mode must be pipeline or direct")

    return {
        "retrieval_ms": t_retr,
        "rerank_ms": t_rer,
        "generation_ms": t_gen,
        "total_ms": t_retr + t_rer + t_gen,
        "answer_preview": ans[-300:],
    }

In [7]:
warmup_q = "Explain dynamic batching in simple terms."
warm_cfg = {"max_new_tokens": 64, "do_sample": False, "temperature": 0.0, "top_p": 1.0}

print("Warmup pipeline")
_ = rag_once(warmup_q, gen_mode="pipeline", gen_cfg=warm_cfg)

print("Warmup direct")
_ = rag_once(warmup_q, gen_mode="direct", gen_cfg=warm_cfg)

print("Warmup done")

Warmup pipeline




Warmup direct
Warmup done


In [8]:
class PowerSampler:
    def __init__(self, hz: float = 5.0, device_index: int = 0):
        self.hz = hz
        self.device_index = device_index
        self.samples = []
        self._stop = threading.Event()
        self._thread = None

    def start(self):
        nvmlInit()
        self.handle = nvmlDeviceGetHandleByIndex(self.device_index)
        self.gpu_name = nvmlDeviceGetName(self.handle)
        self.t0 = time.perf_counter()

        def loop():
            period = 1.0 / self.hz
            while not self._stop.is_set():
                t = time.perf_counter() - self.t0
                mw = nvmlDeviceGetPowerUsage(self.handle)  # milliwatts
                w = mw / 1000.0
                self.samples.append({"t_s": t, "power_w": w})
                time.sleep(period)

        self._thread = threading.Thread(target=loop, daemon=True)
        self._thread.start()

    def stop(self):
        self._stop.set()
        if self._thread is not None:
            self._thread.join()
        nvmlShutdown()

    def to_df(self):
        return pd.DataFrame(self.samples)

In [9]:
gpu_lock = threading.Lock()

def run_fixed_rate(
    name: str,
    gen_mode: str,
    gen_cfg: dict,
    queries: list,
    concurrency: int,
    target_rps: float,
    retrieve_k: int = 10,
    rerank_k: int = 5,
):
    assert concurrency >= 1
    assert target_rps > 0

    arrivals = []
    start0 = time.perf_counter()
    for i in range(len(queries)):
        arrivals.append(start0 + i / target_rps)

    results = []
    sampler = PowerSampler(hz=5.0)
    sampler.start()

    sem = threading.Semaphore(concurrency)
    threads = []

    def worker(i: int):
        q = queries[i]
        scheduled = arrivals[i]
        now = time.perf_counter()
        if scheduled > now:
            time.sleep(scheduled - now)

        sem.acquire()
        try:
            t_req_start = time.perf_counter()

            # Retrieval and rerank can overlap across threads
            retrieved, t_retr = retrieve(q, top_k=retrieve_k)
            reranked, t_rer = rerank(q, retrieved, top_k=rerank_k)
            prompt = build_prompt(q, reranked)

            # Generation is the single GPU served section
            with gpu_lock:
                if gen_mode == "pipeline":
                    ans, t_gen = generate_pipeline(
                        prompt,
                        max_new_tokens=gen_cfg["max_new_tokens"],
                        do_sample=gen_cfg["do_sample"],
                        temperature=gen_cfg["temperature"],
                        top_p=gen_cfg["top_p"],
                    )
                else:
                    ans, t_gen = generate_direct(
                        prompt,
                        max_new_tokens=gen_cfg["max_new_tokens"],
                        do_sample=gen_cfg["do_sample"],
                        temperature=gen_cfg["temperature"],
                        top_p=gen_cfg["top_p"],
                    )

            t_req_end = time.perf_counter()
            latency_ms = (t_req_end - t_req_start) * 1000.0

            results.append({
                "name": name,
                "gen_mode": gen_mode,
                "max_new_tokens": gen_cfg["max_new_tokens"],
                "do_sample": gen_cfg["do_sample"],
                "temperature": gen_cfg["temperature"],
                "top_p": gen_cfg["top_p"],
                "concurrency": concurrency,
                "target_rps": target_rps,
                "retrieval_ms": t_retr,
                "rerank_ms": t_rer,
                "generation_ms": t_gen,
                "latency_ms": latency_ms,
            })
        finally:
            sem.release()

    for i in range(len(queries)):
        t = threading.Thread(target=worker, args=(i,), daemon=True)
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    end1 = time.perf_counter()
    sampler.stop()

    power_df = sampler.to_df()
    run_seconds = end1 - start0

    # Integrate power over time using simple trapezoid
    if len(power_df) >= 2:
        ts = power_df["t_s"].to_numpy()
        ps = power_df["power_w"].to_numpy()
        joules = float(np.trapz(ps, ts))
    else:
        joules = float("nan")

    res_df = pd.DataFrame(results)
    res_df["run_seconds"] = run_seconds
    res_df["run_joules"] = joules
    res_df["energy_j_per_query"] = joules / max(len(res_df), 1)

    achieved_qps = len(res_df) / run_seconds
    res_df["achieved_qps"] = achieved_qps

    return res_df, power_df

In [10]:
queries = [
    "What is CUDA and why is it useful?",
    "What is Triton Inference Server used for?",
    "Why do people use FAISS in RAG systems?",
    "What are Prometheus and Grafana used for?",
    "Explain dynamic batching in simple terms.",
] * 6  # 30 total

print("Total queries:", len(queries))

configs = [
    {
        "name": "baseline_pipeline",
        "gen_mode": "pipeline",
        "gen_cfg": {"max_new_tokens": 160, "do_sample": False, "temperature": 0.0, "top_p": 1.0},
    },
    {
        "name": "optimized_direct",
        "gen_mode": "direct",
        "gen_cfg": {"max_new_tokens": 160, "do_sample": False, "temperature": 0.0, "top_p": 1.0},
    },
]

concurrency_levels = [1, 2, 4, 8]
target_rps_levels = [0.5, 1.0]

Total queries: 30


In [11]:
all_runs = []
all_summaries = []

timestamp = time.strftime("%Y%m%d_%H%M%S")
run_root = OUT_DIR / f"day5_triton_style_{timestamp}"
run_root.mkdir(parents=True, exist_ok=True)

def pct(x, p):
    x = np.asarray(x, dtype=np.float64)
    return float(np.percentile(x, p))

for cfg in configs:
    for conc in concurrency_levels:
        for rps in target_rps_levels:
            run_name = f"{cfg['name']}_c{conc}_rps{str(rps).replace('.','p')}"
            print("\nRunning:", run_name)

            res_df, power_df = run_fixed_rate(
                name=cfg["name"],
                gen_mode=cfg["gen_mode"],
                gen_cfg=cfg["gen_cfg"],
                queries=queries,
                concurrency=conc,
                target_rps=rps,
            )

            res_path = run_root / f"{run_name}_requests.csv"
            power_path = run_root / f"{run_name}_power.csv"
            res_df.to_csv(res_path, index=False)
            power_df.to_csv(power_path, index=False)

            all_runs.append(res_df)

            lat = res_df["latency_ms"].to_list()
            e = res_df["energy_j_per_query"].iloc[0]
            achieved = float(res_df["achieved_qps"].iloc[0])

            summary = {
                "run_name": run_name,
                "config": cfg["name"],
                "gen_mode": cfg["gen_mode"],
                "concurrency": conc,
                "target_rps": rps,
                "n_queries": int(len(res_df)),
                "p50_latency_ms": pct(lat, 50),
                "p95_latency_ms": pct(lat, 95),
                "achieved_qps": achieved,
                "energy_j_per_query": float(e),
                "run_joules": float(res_df["run_joules"].iloc[0]),
                "run_seconds": float(res_df["run_seconds"].iloc[0]),
            }
            all_summaries.append(summary)

summary_df = pd.DataFrame(all_summaries).sort_values(["config", "target_rps", "concurrency"])
summary_csv = run_root / "day5_summary.csv"
summary_df.to_csv(summary_csv, index=False)

print("\nSaved run folder:", run_root)
print("Saved summary:", summary_csv)
summary_df


Running: baseline_pipeline_c1_rps0p5


You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset



Running: baseline_pipeline_c1_rps1p0





Running: baseline_pipeline_c2_rps0p5





Running: baseline_pipeline_c2_rps1p0





Running: baseline_pipeline_c4_rps0p5





Running: baseline_pipeline_c4_rps1p0





Running: baseline_pipeline_c8_rps0p5





Running: baseline_pipeline_c8_rps1p0





Running: optimized_direct_c1_rps0p5

Running: optimized_direct_c1_rps1p0

Running: optimized_direct_c2_rps0p5

Running: optimized_direct_c2_rps1p0

Running: optimized_direct_c4_rps0p5

Running: optimized_direct_c4_rps1p0

Running: optimized_direct_c8_rps0p5

Running: optimized_direct_c8_rps1p0

Saved run folder: /content/drive/MyDrive/PerfWattLab_RAG/outputs/day5_triton_style_20260212_044200
Saved summary: /content/drive/MyDrive/PerfWattLab_RAG/outputs/day5_triton_style_20260212_044200/day5_summary.csv


Unnamed: 0,run_name,config,gen_mode,concurrency,target_rps,n_queries,p50_latency_ms,p95_latency_ms,achieved_qps,energy_j_per_query,run_joules,run_seconds
0,baseline_pipeline_c1_rps0p5,baseline_pipeline,pipeline,1,0.5,30,3337.76418,5622.4396,0.300581,182.522803,5475.684084,99.806549
2,baseline_pipeline_c2_rps0p5,baseline_pipeline,pipeline,2,0.5,30,6260.144194,8553.873918,0.307527,192.927741,5787.832241,97.552282
4,baseline_pipeline_c4_rps0p5,baseline_pipeline,pipeline,4,0.5,30,11614.352442,15336.478418,0.303952,196.844268,5905.328025,98.699648
6,baseline_pipeline_c8_rps0p5,baseline_pipeline,pipeline,8,0.5,30,24117.112079,27610.164588,0.295007,197.62155,5928.646509,101.692573
1,baseline_pipeline_c1_rps1p0,baseline_pipeline,pipeline,1,1.0,30,3245.523251,5597.9456,0.301733,191.43761,5743.128297,99.425702
3,baseline_pipeline_c2_rps1p0,baseline_pipeline,pipeline,2,1.0,30,6337.186251,8800.233044,0.301662,195.455292,5863.658771,99.449146
5,baseline_pipeline_c4_rps1p0,baseline_pipeline,pipeline,4,1.0,30,12889.481663,20629.086873,0.284205,198.447823,5953.434682,105.557778
7,baseline_pipeline_c8_rps1p0,baseline_pipeline,pipeline,8,1.0,30,25828.095981,27646.390392,0.305759,195.959829,5878.79487,98.11637
8,optimized_direct_c1_rps0p5,optimized_direct,direct,1,0.5,30,2946.030817,4959.347461,0.345809,186.246742,5587.402248,86.752992
10,optimized_direct_c2_rps0p5,optimized_direct,direct,2,0.5,30,5292.314184,7601.260176,0.346327,185.943625,5578.308764,86.623278


In [12]:
def pick_best(df, metric, mode):
    if mode == "min":
        i = df[metric].idxmin()
    else:
        i = df[metric].idxmax()
    return df.loc[i].to_dict()

best_latency = pick_best(summary_df, "p50_latency_ms", "min")
best_throughput = pick_best(summary_df, "achieved_qps", "max")
best_energy = pick_best(summary_df, "energy_j_per_query", "min")

baseline_rows = summary_df[summary_df["config"] == "baseline_pipeline"]
baseline_default = baseline_rows.sort_values(["target_rps", "concurrency"]).head(1).iloc[0].to_dict()

report = []
report.append("# Day 5 Triton style evaluation report\n")
report.append("## What this emulates\n")
report.append("Model loading is done once, then warmup, then a fixed request rate workload is driven through a concurrency limited runner.\n")
report.append("Generation is guarded by a single GPU lock to emulate a single served model on one GPU.\n")

report.append("## Baseline\n")
report.append("```json\n" + json.dumps(baseline_default, indent=2) + "\n```\n")

report.append("## Best latency\n")
report.append("```json\n" + json.dumps(best_latency, indent=2) + "\n```\n")

report.append("## Best throughput\n")
report.append("```json\n" + json.dumps(best_throughput, indent=2) + "\n```\n")

report.append("## Best energy per query\n")
report.append("```json\n" + json.dumps(best_energy, indent=2) + "\n```\n")

report.append("## Recommendation\n")
report.append(
    "Use optimized_direct as the default served path, then choose concurrency based on your target request rate. "
    "If the goal is interactive latency, pick the run with lowest p50 latency. "
    "If the goal is batch throughput, pick the run with highest achieved QPS. "
    "Energy per query should be used as a tie breaker when latency targets are met.\n"
)

report.append("## Colab limitations\n")
report.append(
    "NVML reports GPU board power only. Sampling at 5 Hz can miss short spikes. "
    "Colab is shared, so clocks and background load vary. CPU and system power are not included. "
    "Time alignment uses host timestamps and has some jitter.\n"
)

report_text = "\n".join(report)
report_path = run_root / "day5_report.md"
report_path.write_text(report_text)

print("Saved report:", report_path)
print(report_text[:800])

Saved report: /content/drive/MyDrive/PerfWattLab_RAG/outputs/day5_triton_style_20260212_044200/day5_report.md
# Day 5 Triton style evaluation report

## What this emulates

Model loading is done once, then warmup, then a fixed request rate workload is driven through a concurrency limited runner.

Generation is guarded by a single GPU lock to emulate a single served model on one GPU.

## Baseline

```json
{
  "run_name": "baseline_pipeline_c1_rps0p5",
  "config": "baseline_pipeline",
  "gen_mode": "pipeline",
  "concurrency": 1,
  "target_rps": 0.5,
  "n_queries": 30,
  "p50_latency_ms": 3337.7641795000272,
  "p95_latency_ms": 5622.439600150096,
  "achieved_qps": 0.3005814791155597,
  "energy_j_per_query": 182.52280280660523,
  "run_joules": 5475.684084198157,
  "run_seconds": 99.80654858799994
}
```

## Best latency

```json
{
  "run_name": "optimized_direct_c1_rps1p0",
  "config": "optimized_dire
