<b>1) Enterprise code: Greedy + Beam (HuggingFace Transformers, decoder-only CausalLM)
What matters for certification

Greedy: num_beams=1, do_sample=False → fastest, deterministic, but can be repetitive / myopic.

Beam search: num_beams>1, do_sample=False → higher likelihood sequences, great for translation/summarization; can become generic/long unless tuned (length_penalty, early_stopping).</b>

In [None]:
import time
from dataclasses import dataclass
from typing import List, Optional, Literal, Dict, Any

import torch
from transformers import AutoTokenizer, AutoModelForCausalLM


DecodeMode = Literal["greedy", "beam"]


@dataclass
class DecodeConfig:
    max_new_tokens: int = 128
    min_new_tokens: int = 0
    temperature: float = 1.0  # ignored when do_sample=False, kept for completeness
    top_p: float = 1.0
    top_k: int = 0

    # Beam-search knobs
    num_beams: int = 4
    length_penalty: float = 1.0
    early_stopping: bool = True
    num_return_sequences: int = 1

    # Safety / quality knobs
    repetition_penalty: float = 1.0
    no_repeat_ngram_size: int = 0

    # Runtime
    use_fp16: bool = True


class TextGenerator:
    """
    Enterprise-ish wrapper:
    - Handles tokenizer padding
    - Batches prompts
    - Measures latency + tokens/sec
    - Supports greedy and beam search deterministically
    """
    def __init__(self, model_name: str, device: Optional[str] = None):
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")

        self.tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
        self.model = AutoModelForCausalLM.from_pretrained(
            model_name,
            torch_dtype=torch.float16 if (self.device == "cuda") else None,
            device_map="auto" if self.device == "cuda" else None,
        )

        # Many decoder-only LMs (e.g., Llama) may not define a pad token by default.
        if self.tokenizer.pad_token_id is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        self.model.eval()

    @torch.inference_mode()
    def generate(
        self,
        prompts: List[str],
        mode: DecodeMode,
        cfg: Optional[DecodeConfig] = None,
    ) -> Dict[str, Any]:
        cfg = cfg or DecodeConfig()

        enc = self.tokenizer(
            prompts,
            return_tensors="pt",
            padding=True,
            truncation=True,
        )
        input_ids = enc["input_ids"].to(self.device)
        attention_mask = enc["attention_mask"].to(self.device)

        # ---- Greedy vs Beam configuration ----
        if mode == "greedy":
            gen_kwargs = dict(
                do_sample=False,
                num_beams=1,
                num_return_sequences=1,
            )
        elif mode == "beam":
            gen_kwargs = dict(
                do_sample=False,
                num_beams=cfg.num_beams,
                num_return_sequences=cfg.num_return_sequences,
                length_penalty=cfg.length_penalty,
                early_stopping=cfg.early_stopping,
            )
        else:
            raise ValueError(f"Unknown mode: {mode}")

        # Common generation knobs (work for both greedy & beam)
        gen_kwargs.update(
            max_new_tokens=cfg.max_new_tokens,
            min_new_tokens=cfg.min_new_tokens,
            repetition_penalty=cfg.repetition_penalty,
            no_repeat_ngram_size=cfg.no_repeat_ngram_size,
            pad_token_id=self.tokenizer.pad_token_id,
            eos_token_id=self.tokenizer.eos_token_id,
            return_dict_in_generate=True,
            output_scores=False,
        )

        # Optional fp16 autocast (CUDA only)
        start = time.perf_counter()
        if self.device == "cuda" and cfg.use_fp16:
            with torch.autocast(device_type="cuda", dtype=torch.float16):
                out = self.model.generate(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    **gen_kwargs,
                )
        else:
            out = self.model.generate(
                input_ids=input_ids,
                attention_mask=attention_mask,
                **gen_kwargs,
            )
        end = time.perf_counter()

        # Decode
        sequences = out.sequences
        texts = self.tokenizer.batch_decode(sequences, skip_special_tokens=True)

        # Basic throughput estimate
        total_new_tokens = sequences.shape[1] - input_ids.shape[1]
        latency_s = end - start
        tok_per_s = (total_new_tokens * sequences.shape[0]) / max(latency_s, 1e-9)

        return {
            "mode": mode,
            "texts": texts,
            "latency_s": latency_s,
            "tokens_per_s_est": tok_per_s,
            "batch_size": len(prompts),
        }


if __name__ == "__main__":
    # Use any CausalLM you have access to locally.
    # Example: "gpt2", or a local Llama checkpoint path.
    model_name = "gpt2"

    tg = TextGenerator(model_name)

    prompts = [
        "Write a concise explanation of beam search in LLM decoding.",
        "Give 3 bullet points on when greedy decoding is sufficient.",
    ]

    greedy = tg.generate(prompts, mode="greedy", cfg=DecodeConfig(max_new_tokens=80))
    print("\n=== GREEDY ===")
    print("latency_s:", greedy["latency_s"], "tok/s:", greedy["tokens_per_s_est"])
    print(greedy["texts"][0], "\n---\n", greedy["texts"][1])

    beam = tg.generate(prompts, mode="beam", cfg=DecodeConfig(max_new_tokens=80, num_beams=4, length_penalty=0.8))
    print("\n=== BEAM ===")
    print("latency_s:", beam["latency_s"], "tok/s:", beam["tokens_per_s_est"])
    print(beam["texts"][0], "\n---\n", beam["texts"][1])


<b>2) Enterprise code: Greedy + Beam in Triton + TensorRT-LLM (NVIDIA production stack)
What matters for certification

In TensorRT-LLM backend, beam search must be supported by:

Engine build time: set --max_beam_width > 1 for encoder-decoder builds (and similarly for decoder engines). 
NVIDIA Docs

Triton model config: max_beam_width and decoding_mode. Default behavior: if max_beam_width == 1, it defaults to top-k/top-p mode; otherwise defaults to beam search. 
NVIDIA Docs

At request time, you set input tensor beam_width:

beam_width = 1 → greedy

beam_width > 1 → beam search</b>

In [None]:
import numpy as np
import tritonclient.http as httpclient


def np_scalar(value, dtype):
    return np.array([value], dtype=dtype)


def infer_trtllm(
    triton_url: str,
    model_name: str,
    input_ids: np.ndarray,          # shape: [B, S], dtype=int32
    input_lengths: np.ndarray,      # shape: [B, 1], dtype=int32
    request_output_len: int,
    beam_width: int = 1,            # 1 => greedy, >1 => beam search
    repetition_penalty: float = 1.0,
    presence_penalty: float = 0.0,
    frequency_penalty: float = 0.0,
):
    """
    Enterprise-ish skeleton for TRT-LLM Triton backend.
    You typically front this with preprocessing/postprocessing models,
    but the core decoding control (beam_width, penalties) is here.
    """
    client = httpclient.InferenceServerClient(url=triton_url, verbose=False)

    inputs = []

    # Core token inputs (often produced by the 'preprocessing' model in the ensemble).
    inp_input_ids = httpclient.InferInput("input_ids", input_ids.shape, "INT32")
    inp_input_ids.set_data_from_numpy(input_ids)
    inputs.append(inp_input_ids)

    inp_input_lengths = httpclient.InferInput("input_lengths", input_lengths.shape, "INT32")
    inp_input_lengths.set_data_from_numpy(input_lengths)
    inputs.append(inp_input_lengths)

    # Decoding controls (documented common inputs)
    inp_beam = httpclient.InferInput("beam_width", [1], "INT32")
    inp_beam.set_data_from_numpy(np_scalar(beam_width, np.int32))
    inputs.append(inp_beam)

    inp_rep = httpclient.InferInput("repetition_penalty", [1], "FP32")
    inp_rep.set_data_from_numpy(np_scalar(repetition_penalty, np.float32))
    inputs.append(inp_rep)

    inp_pres = httpclient.InferInput("presence_penalty", [1], "FP32")
    inp_pres.set_data_from_numpy(np_scalar(presence_penalty, np.float32))
    inputs.append(inp_pres)

    inp_freq = httpclient.InferInput("frequency_penalty", [1], "FP32")
    inp_freq.set_data_from_numpy(np_scalar(frequency_penalty, np.float32))
    inputs.append(inp_freq)

    # Typical control tensors (names can vary by template/client; adapt to your deployed model)
    out_len = httpclient.InferInput("request_output_len", [1], "INT32")
    out_len.set_data_from_numpy(np_scalar(request_output_len, np.int32))
    inputs.append(out_len)

    # What outputs you ask for depends on your backend template
    outputs = [
        httpclient.InferRequestedOutput("output_ids"),
        httpclient.InferRequestedOutput("sequence_length"),
    ]

    resp = client.infer(model_name=model_name, inputs=inputs, outputs=outputs)
    output_ids = resp.as_numpy("output_ids")
    seq_lens = resp.as_numpy("sequence_length")
    return output_ids, seq_lens


if __name__ == "__main__":
    # Example shapes (B=1, S=4). In practice, get these from your tokenizer/preprocessing model.
    input_ids = np.array([[101, 102, 103, 104]], dtype=np.int32)
    input_lengths = np.array([[4]], dtype=np.int32)

    # Greedy
    out_ids_g, out_len_g = infer_trtllm(
        triton_url="localhost:8000",
        model_name="tensorrt_llm",
        input_ids=input_ids,
        input_lengths=input_lengths,
        request_output_len=64,
        beam_width=1,  # greedy
    )
    print("GREEDY output_ids:", out_ids_g, "lens:", out_len_g)

    # Beam
    out_ids_b, out_len_b = infer_trtllm(
        triton_url="localhost:8000",
        model_name="tensorrt_llm",
        input_ids=input_ids,
        input_lengths=input_lengths,
        request_output_len=64,
        beam_width=4,  # beam search
        repetition_penalty=1.05,
    )
    print("BEAM output_ids:", out_ids_b, "lens:", out_len_b)


<b>A. Greedy Decoding (Fast, Low Memory)
Logic: Always pick the highest probability token.

Memory: 1 Sequence per User.</b>

In [4]:
import torch
import torch.nn.functional as F

def greedy_decoding(model, tokenizer, prompt, max_new_tokens=50):
    input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
    
    # KV Cache setup (Simulated for clarity - real implementations verify cache shape)
    past_key_values = None

    for _ in range(max_new_tokens):
        with torch.no_grad():
            # 1. Forward pass (only pass the last token if using KV cache)
            outputs = model(input_ids, past_key_values=past_key_values)
            
            # 2. Get Logits for the last token
            next_token_logits = outputs.logits[:, -1, :]
            
            # 3. Greedy Selection (Argmax)
            next_token_id = torch.argmax(next_token_logits, dim=-1).unsqueeze(-1)
            
            # 4. Append to sequence
            input_ids = torch.cat([input_ids, next_token_id], dim=-1)
            
            # 5. Update KV Cache for next step
            past_key_values = outputs.past_key_values
            
            # 6. Stop if EOS is generated
            if next_token_id.item() == tokenizer.eos_token_id:
                break
                
    return tokenizer.decode(input_ids[0])

<b>B. Beam Search (High Quality, High Memory)Logic: Maintain top $K$ sequences.Memory: $K$ Sequences per User (High VRAM usage).Exam Key: Notice the Score Calculation with length_penalty.</b>

In [None]:
import math

def beam_search_step(model, input_ids, beam_width=4, length_penalty=1.0):
    # This is a simplified single-step visualization of the Beam Search Logic
    
    # Assume we have 'beam_width' number of candidate sequences from previous step
    # Shape: [beam_width, current_seq_len]
    
    with torch.no_grad():
        outputs = model(input_ids)
        next_token_logits = outputs.logits[:, -1, :] # [beam_width, vocab_size]
        
        # 1. Convert to Log Probabilities (Scores are additive in log-space)
        next_token_probs = F.log_softmax(next_token_logits, dim=-1)
        
        # 2. Expand: Calculate score for ALL possible next tokens for ALL beams
        # If current beam score is X, new score is X + log_prob(token)
        # We simulate this expansion and pick Top-K from the (beam_width * vocab) pool
        
        vocab_size = next_token_logits.shape[-1]
        
        # In a real loop, we add previous_scores + next_token_probs
        # Here we just show the selection logic:
        top_k_scores, top_k_indices = torch.topk(next_token_probs.view(-1), k=beam_width)
        
        # 3. Decode indices back to (Beam Index, Token Index)
        beam_indices = top_k_indices // vocab_size
        token_indices = top_k_indices % vocab_size
        
        # 4. Apply Length Penalty (Critical Exam Concept)
        # Score = LogProb / (Length ^ alpha)
        # This prevents the model from preferring extremely short sentences
        current_length = input_ids.shape[1] + 1
        penalty_factor = ((5 + current_length) / 6) ** length_penalty
        adjusted_scores = top_k_scores / penalty_factor
        
        return beam_indices, token_indices, adjusted_scores

<b>2. The Enterprise Deployment (TensorRT-LLM)
In the real exam and production, you do not write loops. You configure the Executor.

This is how you enable Beam Search in a tensorrt_llm Python script.</b>

In [None]:
import tensorrt_llm
from tensorrt_llm.runtime import ModelRunner, SamplingConfig

# 1. Initialize the Optimized Runtime
runner = ModelRunner.from_dir("path/to/trt_engine_dir")

# 2. Define Inputs
prompt = "Describe the architecture of a Transformer."
input_ids = tokenizer.encode(prompt)

# --- SCENARIO A: GREEDY DECODING ---
greedy_config = SamplingConfig(
    end_id=tokenizer.eos_token_id,
    pad_id=tokenizer.pad_token_id,
    num_beams=1,      # Exam Key: num_beams=1 implies Greedy or Sampling (Top-K/P)
    top_k=1,          # Strict Greedy
    top_p=0.0
)

# --- SCENARIO B: BEAM SEARCH ---
beam_config = SamplingConfig(
    end_id=tokenizer.eos_token_id,
    pad_id=tokenizer.pad_token_id,
    num_beams=5,             # Exam Key: > 1 triggers Beam Search kernels
    length_penalty=1.2,      # Exam Key: > 1.0 encourages longer output
    early_stopping=True      # Stop when 5 full sentences are found
)

# 3. Run Inference
# TRT-LLM handles the complex beam expansion/pruning in C++ kernels
outputs = runner.generate(
    batch_input_ids=[input_ids],
    max_new_tokens=100,
    sampling_config=beam_config 
)

print(outputs)

<b>3. Triton Inference Server Integration
If you are using Triton (the preferred NVIDIA deployment method), you don't even write the Python code above. You pass parameters in the JSON Payload.</b>

In [6]:
{
  "text_input": "Explain quantum computing",
  "parameters": {
    "max_tokens": 128,
    "beam_width": 5,          
    "length_penalty": 1.5,    
    "repetition_penalty": 1.0 
  }
}

{'text_input': 'Explain quantum computing',
 'parameters': {'max_tokens': 128,
  'beam_width': 5,
  'length_penalty': 1.5,
  'repetition_penalty': 1.0}}