<a href="https://colab.research.google.com/github/ciro-greco/AI-engineering-IEOR4574E001/blob/main/Week_3_%E2%80%94_Sampling_Lab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================================
# ============================================================
# üß† Purpose
#   See and *feel* how sampling knobs change LLM behavior:
#   - temperature
#   - top_p (nucleus) and (optionally) top_k
#   - repetition penalties
#   - simple ‚Äúconstrained‚Äù outputs (JSON) with validation loop
#   - test-time compute: best-of-n (re-ranking) and self-consistency (voting)
#
# ‚öôÔ∏è Requirements
#   - Runs locally or in Google Colab (GPU recommended, CPU OK but slower)
#   - No paid API needed; we use a free HF model (TinyLlama)
#   - If running in a fresh Colab, add first:
#       !pip install -U transformers accelerate torch huggingface_hub matplotlib pandas
#
# üîó Slides tie-in
#   - ‚ÄúSampling‚Äù (slides 28‚Äì31): temperature, top-p, repetition control,
#     constrained decoding, best-of-n, self-consistency, trade-offs.
# ============================================================

import os, time, math, random, json, statistics, re
from typing import Dict, Any, List, Optional, Tuple

import torch
import pandas as pd
import matplotlib.pyplot as plt

from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TextIteratorStreamer,  # lets us stream tokens to observe UX + TTFT/TPOT if desired
)

# ------------------------------------------------------------
# 0) Environment & model
# ------------------------------------------------------------
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_ID = "TinyLlama/TinyLlama-1.1B-Chat-v1.0"  # small + public + chat-tuned
SEED = 1234  # fixed seed when we want to reduce randomness across runs

print(f"[Setup] device={DEVICE}, model={MODEL_ID}")

# (Optional) quick RNG seeding utility to make comparisons less noisy
def set_seed(seed: int = 1234):
    random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

# Tokenizer & model load
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
    MODEL_ID,
    torch_dtype=torch.float16 if DEVICE == "cuda" else torch.float32,
    low_cpu_mem_usage=True,
).to(DEVICE)
model.eval()
print("[Load] Model ready.\n")

# ------------------------------------------------------------
# 1) Prompt formatting & generation helpers
# ------------------------------------------------------------
def chat_prompt(system: str, user: str) -> str:
    """
    Minimal chat formatter (TinyLlama is chat-tuned; this is sufficient for our lab).
    In production, prefer tokenizer.chat_template if available.
    """
    return f"[SYSTEM]\n{system}\n[USER]\n{user}\n[ASSISTANT]\n"

def generate_once(
    system: str,
    user: str,
    *,
    max_new_tokens: int = 128,
    temperature: float = 0.7,
    top_p: float = 0.9,
    top_k: Optional[int] = None,            # leave None unless you want hard caps
    repetition_penalty: Optional[float] = None, # >1 discourages repetition (e.g., 1.05‚Äì1.2)
    seed: Optional[int] = None,
    stream: bool = False,                   # turn on to *watch* tokens for UX; off for speed
) -> Dict[str, Any]:
    """
    Single blocking/streaming generation with core sampling knobs.
    Returns the text and basic timing for illustration; not a rigorous benchmark.
    """
    if seed is not None:
        set_seed(seed)

    prompt = chat_prompt(system, user)
    inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE)

    gen_kwargs = dict(
        **inputs,
        max_new_tokens=max_new_tokens,
        do_sample=True,                          # enable sampling (otherwise greedy)
        temperature=temperature,
        top_p=top_p,
        eos_token_id=tokenizer.eos_token_id,
    )
    if top_k is not None:
        gen_kwargs["top_k"] = int(top_k)
    if repetition_penalty is not None:
        gen_kwargs["repetition_penalty"] = float(repetition_penalty)

    t0 = time.time()

    if stream:
        # Streaming path ‚Äî nice for *seeing* how UX feels (TTFT, steady typing)
        streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
        gen_kwargs["streamer"] = streamer
        th = __import__("threading").Thread(target=model.generate, kwargs=gen_kwargs)
        th.start()

        pieces = []
        first = None
        for piece in streamer:
            if first is None:
                first = time.time()
                print(f"\n[TTFT] {(first - t0)*1000:.1f} ms\n")
            pieces.append(piece)
            print(piece, end="", flush=True)
        t1 = time.time()
        text = "".join(pieces)
    else:
        # Non-streaming path ‚Äî returns full text
        out = model.generate(**gen_kwargs)
        text = tokenizer.decode(out[0], skip_special_tokens=True)
        t1 = time.time()

    # Strip prompt header for readability
    if "[ASSISTANT]\n" in text:
        text = text.split("[ASSISTANT]\n", 1)[-1]

    return {
        "text": text,
        "elapsed_s": t1 - t0,
        "temperature": temperature,
        "top_p": top_p,
        "top_k": top_k,
        "repetition_penalty": repetition_penalty,
    }

# ------------------------------------------------------------
# 2) Temperature demo ‚Äî ‚Äúmore random‚Äù vs ‚Äúmore deterministic‚Äù
# ------------------------------------------------------------
SYSTEM = "You are a helpful, concise assistant."
USER   = "Give two original, *different* taglines for a CS student hackathon."

print("=== Temperature sweep (fixed top_p=0.9, seed=None to see diversity) ===")
for temp in [0.2, 0.7, 1.0]:
    print(f"\n--- temperature={temp} ---")
    out = generate_once(
        SYSTEM, USER,
        temperature=temp,
        top_p=0.9,
        max_new_tokens=80,
        seed=None,      # None ‚Üí allow natural randomness; set a seed to stabilize
        stream=False
    )
    print(out["text"])

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ temperature‚âà0.2 ‚Üí sharp, repetitive, safe
#   ‚Ä¢ temperature‚âà0.7 ‚Üí balanced
#   ‚Ä¢ temperature‚âà1.0 ‚Üí imaginative, more risk of nonsense

# ------------------------------------------------------------
# 3) Nucleus sampling (top_p) demo ‚Äî cap the ‚Äútail‚Äù
# ------------------------------------------------------------
print("\n=== top_p sweep (fixed temperature=0.7) ===")
for p in [0.7, 0.9, 0.95]:
    print(f"\n--- top_p={p} ---")
    out = generate_once(
        SYSTEM, USER,
        temperature=0.7,
        top_p=p,
        max_new_tokens=80,
        seed=SEED,     # fix seed to isolate effect of top_p only
        stream=False
    )
    print(out["text"])

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ lower top_p ‚Üí keep only the most likely tokens (safer, less diverse)
#   ‚Ä¢ higher top_p ‚Üí include more tail tokens (more variety, more risk)

# ------------------------------------------------------------
# 4) Repetition penalty ‚Äî mitigate loops / echoing
# ------------------------------------------------------------
LONG_USER = (
    "Write a short paragraph about the importance of testing in software engineering. "
    "Avoid repeating the same words too often."
)

def repetition_ratio(text: str) -> float:
    """
    Toy metric: % of tokens that are exact repeats of the previous token.
    It's crude, but demo-worthy for showing repetition control.
    """
    toks = re.findall(r"\w+|\S", text.lower())
    if len(toks) < 2:
        return 0.0
    repeats = sum(1 for i in range(1, len(toks)) if toks[i] == toks[i-1])
    return repeats / max(1, len(toks)-1)

print("\n=== repetition_penalty demo (higher discourages repeats) ===")
for rp in [None, 1.05, 1.2]:
    out = generate_once(
        SYSTEM, LONG_USER,
        temperature=0.9, top_p=0.95,
        repetition_penalty=rp,
        max_new_tokens=140,
        seed=SEED,
        stream=False
    )
    rratio = repetition_ratio(out["text"])
    print(f"\n--- repetition_penalty={rp} | repetition_ratio‚âà{rratio:.3f} ---\n{out['text'][:800]}")

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ repetition_penalty > 1 pushes the model away from repeating recently used tokens
#   ‚Ä¢ too high ‚Üí odd phrasing; too low ‚Üí loops/echoing

# ------------------------------------------------------------
# 5) Simple ‚Äúconstrained‚Äù JSON output with validation loop
# ------------------------------------------------------------
# There‚Äôs no native JSON grammar here, so we‚Äôll do an *application-level* loop:
#  1) Ask for JSON with a tiny schema
#  2) Try json.loads
#  3) If invalid, ask the model to fix it once (or twice)
JSON_USER = (
    "Return a JSON object with keys 'title' and 'bullets' where 'bullets' is a list of 3 short strings. "
    "Do not include prose outside JSON."
)

def json_request(
    system: str, user: str, max_attempts: int = 2
) -> Tuple[Dict[str, Any], str]:
    """
    Attempt to obtain valid JSON from the model with at most `max_attempts` repairs.
    Returns (parsed_json_or_empty_dict, raw_text).
    """
    attempt = 0
    carry_instruction = ""
    while attempt <= max_attempts:
        attempt += 1
        msg = user if attempt == 1 else (
            "Please fix the JSON so it is strictly valid and contains only the object (no extra text)."
        )
        out = generate_once(
            system, carry_instruction + msg,
            temperature=0.4, top_p=0.9, max_new_tokens=180, seed=SEED
        )["text"].strip()
        # Try to extract JSON if there's extra chatter (defensive)
        m = re.search(r"\{.*\}", out, re.DOTALL)
        candidate = m.group(0) if m else out
        try:
            obj = json.loads(candidate)
            # Check tiny schema
            if isinstance(obj, dict) and "title" in obj and "bullets" in obj and isinstance(obj["bullets"], list):
                return obj, out
        except Exception:
            carry_instruction = "The previous output was invalid. "  # add context for next attempt
    return {}, out  # failed after max attempts

print("\n=== simple constrained JSON with a validation/repair loop ===")
parsed, raw = json_request(SYSTEM, JSON_USER, max_attempts=2)
print("Parsed JSON:", parsed)
print("\nRaw model output (for inspection):\n", raw[:500])

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ This emulates ‚Äúconstrained decoding‚Äù from the *system‚Äôs* perspective.
#   ‚Ä¢ True grammar-constrained decoding needs a decoding-time grammar; here we teach students
#     how to build a *robust* loop when the runtime lacks grammar support.

# ------------------------------------------------------------
# 6) Best-of-n (test-time compute) ‚Äî multiple samples + simple re-rank
# ------------------------------------------------------------
# Scenario: we want a catchy tagline and will pick the ‚Äúbest‚Äù by an app-defined score.
# ‚ÄúBest-of-n‚Äù improves quality but costs more latency/tokens.

TAGLINE_USER = "Suggest one short, catchy tagline for a university hackathon. Keep it under 8 words."

def tagline_score(s: str) -> float:
    """
    Super simple scorer:
      + length closeness to target (we want 3‚Äì6 words)
      + bonus if certain keywords appear (hack, build, code)
    """
    words = re.findall(r"\w+", s.lower())
    length = len(words)
    length_target = 5
    length_score = max(0.0, 1.0 - abs(length - length_target) / length_target)
    kw_bonus = 0.2 * sum(1 for k in ["hack", "build", "code"] if k in words)
    # penalize punctuation-heavy, long lines
    penalty = 0.0 if len(s) < 60 else 0.2
    return length_score + kw_bonus - penalty

def best_of_n_tagline(
    n: int = 5,
    temperature: float = 0.9,
    top_p: float = 0.95,
) -> Dict[str, Any]:
    """
    Sample n candidates and pick highest-scoring by `tagline_score`.
    Returns all candidates + winner; reports simple ‚Äúcost proxy‚Äù = total tokens generated.
    """
    candidates, scores, total_time = [], [], 0.0
    total_tokens = 0
    for i in range(n):
        out = generate_once(
            SYSTEM, TAGLINE_USER,
            temperature=temperature, top_p=top_p,
            max_new_tokens=24,
            seed=None,       # allow diversity
            stream=False
        )
        candidates.append(out["text"].strip())
        s = tagline_score(out["text"])
        scores.append(s)
        total_time += out["elapsed_s"]
        # crude token proxy: length in words (not exact tokens, but fine for teaching)
        total_tokens += len(re.findall(r"\w+|\S", out["text"]))
    best_idx = max(range(n), key=lambda i: scores[i])
    return {
        "winner": candidates[best_idx],
        "winner_score": scores[best_idx],
        "candidates": candidates,
        "scores": scores,
        "avg_latency_s": total_time / n,
        "cost_proxy_tokens": total_tokens,  # larger n ‚Üí more sampled text ‚Üí cost‚Üë
    }

print("\n=== best-of-n demo (n=5) ===")
bo = best_of_n_tagline(n=5, temperature=0.9, top_p=0.95)
print("Winner:", bo["winner"])
print("Scores:", [round(x, 2) for x in bo["scores"]])
print("Avg latency (s):", round(bo["avg_latency_s"], 2))
print("Cost proxy (tokens across all candidates):", bo["cost_proxy_tokens"])

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ Quality ‚Üë with n, but cost & latency ‚Üë linearly.
#   ‚Ä¢ This is exactly the ‚Äútest-time compute‚Äù trade-off from the slides.

# ------------------------------------------------------------
# 7) Self-consistency (reasoning) ‚Äî vote across multiple samples
# ------------------------------------------------------------
# Scenario: The question has a *single* correct numeric answer.
# We sample K times and choose the majority numeric value.
REASON_USER = (
    "A class has 12 teams. Each team builds 3 prototypes. Each prototype needs 2 mentors. "
    "How many mentors are needed in total? Give only the number."
)

def extract_number(text: str) -> Optional[int]:
    m = re.search(r"\d+", text)
    return int(m.group(0)) if m else None

def self_consistency_vote(k: int = 5, temperature: float = 0.8, top_p: float = 0.9):
    """
    Sample K answers; extract numeric results; pick majority (or return None if no consensus).
    """
    answers = []
    for _ in range(k):
        out = generate_once(
            SYSTEM, REASON_USER,
            temperature=temperature, top_p=top_p,
            max_new_tokens=32,
            seed=None, stream=False
        )
        num = extract_number(out["text"])
        answers.append(num)
    # majority vote among non-None answers
    counts = {}
    for a in answers:
        if a is not None:
            counts[a] = counts.get(a, 0) + 1
    winner = max(counts, key=counts.get) if counts else None
    return {
        "samples": answers,
        "vote_counts": counts,
        "winner": winner,
    }

print("\n=== self-consistency demo (k=5) ===")
sc = self_consistency_vote(k=5, temperature=0.8, top_p=0.9)
print("Raw samples:", sc["samples"])
print("Vote counts:", sc["vote_counts"])
print("Consensus (winner):", sc["winner"])

# ‚úçÔ∏è Teaching note:
#   ‚Ä¢ Self-consistency increases robustness on reasoning, but costs k√ó tokens/latency.
#   ‚Ä¢ Good when correctness matters more than speed/cost.

# ------------------------------------------------------------
# 8) Summary / Suggested defaults (for the slide)
# ------------------------------------------------------------
print("\n=== Suggested practical defaults (good first try) ===")
print("- temperature: 0.7, top_p: 0.9 (balanced)")
print("- leave top_k unset (use top_p alone) unless you need hard caps")
print("- enable light repetition_penalty (e.g., 1.05) if you see loops/echoing")
print("- for tool/JSON outputs: use schemas + validation/repair loop")
print("- for creative tasks: raise temperature and/or top_p; consider best-of-3")
print("- for reasoning tasks: try self-consistency (3‚Äì5 samples) with a vote")
print("\n‚úÖ Sampling lab complete ‚Äî you experimented with the key knobs and trade-offs.")


[Setup] device=cpu, model=TinyLlama/TinyLlama-1.1B-Chat-v1.0


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/551 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/608 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


model.safetensors:   0%|          | 0.00/2.20G [00:00<?, ?B/s]