# 🔴 **Import**

In [1]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from IPython.display import HTML
import torch

# 🔴 **Init**

In [2]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

# 🔴 **Model & Tokenizer**

In [3]:
# model & tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
model = GPT2LMHeadModel.from_pretrained("gpt2")
model = model.to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

# 🔴 Methods

### 🟡 `top-k` & `multinomial` `n_rep` & `End_Of_token`

In [None]:
def generate_top_k(model, tokenizer, prompt, n_rep=5, max_seq_len=128, T=0.9, top_k=10, device='cuda', seed=42):
  # Tokenize the prompt and convert it to a tensor on the specified device (e.g., GPU)

    inputs = torch.tensor(tokenizer.encode(prompt), dtype=torch.int, device=device)  # [T]

    # Repeat the input prompt n_rep times to generate multiple sequences in parallel
    inputs = inputs.unsqueeze(0).repeat(n_rep, 1)  # Shape: [B, T] where B = n_rep

    # Set the model to evaluation mode
    model.eval()

    # end_token_id = tokenizer.token_to_id('<|endoftext|>')
    end_token_id = tokenizer.encode('<|endoftext|>')[0]
    finished = torch.zeros(n_rep, dtype=torch.bool, device=device)  # [B] where B = n_rep

    # Initialize a random number generator for sampling
    sample_rng = torch.Generator(device=device)
    sample_rng.manual_seed(seed)

    # Disable gradient calculation for faster inference
    with torch.no_grad():
      # Continue generating tokens until reaching the maximum sequence length
      while inputs.shape[-1] < max_seq_len and not finished.all():
        # Forward pass: get logits from the model
        output = model(inputs)  # Shape: [B, T, vocab_size]

        # Apply temperature scaling and softmax to get probabilities for the next token
        probs = torch.softmax(output.logits[:, -1, :] / T, dim=-1)   # Shape: [B, vocab_size]

        # Select the top_k tokens with the highest probabilities
        topk_probs, topk_indices = torch.topk(probs, k=top_k, dim=-1)  # Shape: [B, top_k]

        # Sample one token from the top_k candidates based on their probabilities
        ids = torch.multinomial(topk_probs, 1, generator=sample_rng)  # Shape: [B, 1]

        # Map the sampled indices back to the original token IDs
        ids = torch.gather(topk_indices, -1, ids)  # Shape: [B, 1]

        # Update finished flags
        finished |= (ids.squeeze(1) == end_token_id)

        # For finished sequences, we append the end_token_id repeatedly to maintain shape
        ids[finished.unsqueeze(1)] = end_token_id

        # Append the sampled tokens to the input sequence
        inputs = torch.cat((inputs, ids), dim=-1)  # Shape: [B, T+1]

    # Cut off everything after the first occurrence of end_token_id
    final_outputs = []
    for sequence in inputs.tolist():
        if end_token_id in sequence[1:]:
          end_index = sequence[1:].index(end_token_id)
          final_outputs.append(sequence[:end_index+1])
        else:
            final_outputs.append(sequence)
    # Decode the generated sequences back into text
    generated_text = tokenizer.batch_decode(final_outputs)
    return generated_text

### 🟡 `top-P` & `multinomial` `n_rep` & `End_Of_token`

In [None]:
def generate_top_p(model, tokenizer, prompt, n_rep=3, max_seq_len=256, T=0.9, top_p=0.9, device='cuda', seed=42):
    # Tokenize the prompt and convert it to a tensor on the specified device
    inputs = torch.tensor(tokenizer.encode(prompt), dtype=torch.int, device=device)  # Shape: [T]

    # Repeat the input prompt n_rep times to generate multiple sequences in parallel
    inputs = inputs.unsqueeze(0).repeat(n_rep, 1)  # Shape: [n_rep, T]

    end_token_id = tokenizer.encode('<|endoftext|>')[0]

    model.eval()
    sample_rng = torch.Generator(device=device)
    sample_rng.manual_seed(seed)

    # Track which sequences have finished (hit end token)
    finished = torch.zeros(n_rep, dtype=torch.bool, device=device)

    with torch.no_grad():
        while inputs.shape[-1] < max_seq_len and not all(finished):
            # Forward pass: get logits from the model
            output = model(inputs)  # Shape: [n_rep, T, vocab_size]

            # Apply softmax to get probabilities for the next token
            probs = torch.softmax(output.logits[:, -1, :] / T, dim=-1)  # Shape: [n_rep, vocab_size]

            # Select the top_P tokens for each sequence in the batch
            sorted_probs, sorted_indices = torch.sort(probs, descending=True, dim=-1)
            cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

            # Create mask for top-p sampling
            mask = cumulative_probs <= top_p
            # Ensure we always have at least one token
            mask[:, 0] = True

            # Find cutoff indices
            cutoff = mask.sum(dim=-1)

            # Prepare for batched sampling
            sampled_tokens = []
            for i in range(n_rep):
                if finished[i]:
                    # If sequence is finished, just pad with end token
                    sampled_tokens.append(end_token_id)
                    continue

                num_keep = cutoff[i]
                final_probs = sorted_probs[i, :num_keep]
                final_indices = sorted_indices[i, :num_keep]

                # Renormalize probabilities
                final_probs = final_probs / final_probs.sum()

                # Sample one token
                idx = torch.multinomial(final_probs, num_samples=1, generator=sample_rng)
                sampled_token = final_indices[idx]
                sampled_tokens.append(sampled_token)

                # Check if this sequence should finish
                if sampled_token == end_token_id:
                    finished[i] = True

            # Convert sampled tokens to tensor and add to inputs
            sampled_tokens = torch.tensor(sampled_tokens, device=device).unsqueeze(-1)
            inputs = torch.cat((inputs, sampled_tokens), dim=-1)

            # Early exit if all sequences are finished
            if all(finished):
                break

    # Cut off everything after the first occurrence of end_token_id
    final_outputs = []
    for sequence in inputs.tolist():
        if end_token_id in sequence[1:]:
          end_index = sequence[1:].index(end_token_id)
          final_outputs.append(sequence[:end_index+1])
        else:
            final_outputs.append(sequence)

    # Decode all generated sequences
    # generated_texts = tokenizer.batch_decode(inputs.tolist())
    generated_texts = tokenizer.batch_decode(final_outputs)
    return generated_texts

### 🟡 `beam search`

In [None]:
def generate_beam_search(model, tokenizer, prompt, beam_width=3, max_seq_len=256, device="cuda"):
  # Tokenize the prompt and convert it to a tensor on the specified device (e.g., GPU)
  input_ids = torch.tensor(tokenizer.encode(prompt), dtype=torch.int, device=device).unsqueeze(0) # Shape: [B, T]

  end_token_id = tokenizer.encode('<|endoftext|>')[0] # end_token_id

  beams = [(input_ids, 0.0)] # (input_tensor, score)

  # Set the model to evaluation mode
  model.eval()

  # Disable gradient calculation for faster inference
  with torch.no_grad():
    # Continue generating tokens until reaching the maximum sequence length
    while beams[0][0][0].shape[-1] < max_seq_len:
        candidates = []
        for seq, score in beams:

            if seq[0, -1].item() == end_token_id:
                candidates.append((seq, score))
                continue
            # Forward pass: get logits from the model
            output = model(seq)  # Shape: [B, T, vocab_size]

            # Apply softmax to get probabilities for the next token
            probs = torch.log(torch.softmax(output.logits[:, -1, :], dim=-1)) # Shape: [B, vocab_size]
            # probs = F.log_softmax(logits[:, -1, :], dim=-1) # Shape: [B, vocab_size]

            # Select the top_k tokens with the highest probabilities
            topk_probs, topk_indices = torch.topk(probs, beam_width, dim=-1)  # Shape: [B, top_k] where top_k = beam_width

            # For each top-k token, create a new candidate sequence by appending the token
            # and update its cumulative log-probability score
            for log_prob, token_id in zip(topk_probs[0], topk_indices[0]):
                new_seq = torch.cat([seq, token_id.view(1, 1)], dim=1)
                new_score = score + log_prob.item()
                candidates.append((new_seq, new_score))

        # Keep the top `beam_width` sequences with the highest cumulative log-probabilities
        beams = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_width]

        # If all beams have ended with the end-of-text token, stop early (early stopping)
        if all(seq[0, -1].item() == end_token_id for seq, _ in beams):
            break

  # Retrieve the best token sequence from the top beam and decode it back into text
  generated_text = tokenizer.decode(beams[0][0][0].tolist())
  return generated_text

### 🟡 `beam search` & `n-gram`

In [None]:
def has_repeat_ngram(token_sequence, n):
  """
    Checks whether the given token sequence contains any repeated n-grams.

    Args:
        token_sequence (list of int): The list of token IDs to check.
        n (int): The n-gram size (e.g., 2 for bigrams, 3 for trigrams).

    Returns:
        bool: True if a repeated n-gram is found, otherwise False.
  """
  if len(token_sequence) < n:
      return False
  seen = set()
  for i in range(len(token_sequence) - n + 1):
      ngram = tuple(token_sequence[i:i + n])
      if ngram in seen:
          return True
      seen.add(ngram)
  return False

In [None]:
def generate_beam_search_ngram(model, tokenizer, prompt, beam_width=3, max_seq_len=50, ngram_size=3, device="cuda"):
  # Tokenize the prompt and convert it to a tensor on the specified device (e.g., GPU)
  input_ids = torch.tensor(tokenizer.encode(prompt), dtype=torch.int, device=device).unsqueeze(0) # Shape: [B, T]

  end_token_id = tokenizer.encode('<|endoftext|>')[0] # end_token_id

  beams = [(input_ids, 0.0)] # (input_tensor, score)

  # Set the model to evaluation mode
  model.eval()

  # Disable gradient calculation for faster inference
  with torch.no_grad():
    # Continue generating tokens until reaching the maximum sequence length
    while beams[0][0][0].shape[-1] < max_seq_len:
      # Initialize the list of candidate sequences for the next beam step
      candidates = []
      for seq, score in beams:
        if seq[0, -1].item() == end_token_id:
            candidates.append((seq, score))
            continue
        # Forward pass: get logits from the model
        output = model(seq) # Shape: [B, T, vocab_size]

        # Apply softmax to get probabilities for the next token
        probs = torch.log(torch.softmax(output.logits[:, -1, :], dim=-1)) # Shape: [B, vocab_size]
        # probs = F.log_softmax(logits[:, -1, :], dim=-1) # Shape: [B, vocab_size]

        # Select the top_k tokens with the highest probabilities
        topk_probs, topk_indices = torch.topk(probs, k=beam_width * 2, dim=-1)  # Shape: [B, 2*top_k] where top_k = beam_width

        # For each top-k token, create a new candidate sequence by appending the token
        # and update its cumulative log-probability score
        added = 0
        for log_prob, token_id in zip(topk_probs[0], topk_indices[0]):
            new_seq = torch.cat([seq, token_id.view(1, 1)], dim=1)
            token_list = new_seq[0].tolist()

            # Skip this candidate if adding the new token would create a repeated n-gram
            if has_repeat_ngram(token_list, n=ngram_size):
                continue

            new_score = score + log_prob.item()
            candidates.append((new_seq, new_score))
            added += 1

            # Stop adding candidates once the beam width limit is reached
            if added >= beam_width:
                break

      if not candidates:
          break

      # Keep the top `beam_width` sequences with the highest cumulative log-probabilities
      beams = sorted(candidates, key=lambda x: x[1], reverse=True)[:beam_width]

      # If all beams have ended with the end-of-text token, stop early
      if all(seq[0, -1].item() == end_token_id for seq, _ in beams):
          break

  # Retrieve the best token sequence from the top beam and decode it back into text
  generated_text = tokenizer.decode(beams[0][0][0].tolist())
  return generated_text

# 🔴 Generate

In [None]:
PROMPT_FILE = "default_prompts.txt"
OUTPUT_FILE = "outputs.txt"

methods = {
    "Top-k": lambda p: generate_top_k(model, tokenizer, p, n_rep=1, max_seq_len=100, T=1.0, top_k=50, device=device, seed=42),
    "Top-p": lambda p: generate_top_p(model, tokenizer, p, n_rep=1, max_seq_len=100, T=1.0, top_p=0.92, device=device, seed=42),
    "Top-p + temp ↑": lambda p: generate_top_p(model, tokenizer, p, n_rep=1, max_seq_len=100, T=1.3, top_p=0.92, device=device, seed=42),
    "Top-k + temp ↓": lambda p: generate_top_k(model, tokenizer, p, n_rep=1, max_seq_len=100, T=0.7, top_k=40, device=device, seed=42),
    "Beam": lambda p: generate_beam_search(model, tokenizer, p, beam_width=4, max_seq_len=100, device=device),
    "Beam + N-gram": lambda p: generate_beam_search_ngram(model, tokenizer, p, beam_width=5, max_seq_len=100, ngram_size=3, device=device)
}

def load_prompts(path):
    with open(path, "r", encoding="utf-8") as f:
        prompts = [line.strip() for line in f if line.strip() and not line.startswith("#")]
    return prompts

def main():
    prompts = load_prompts(PROMPT_FILE)

    with open(OUTPUT_FILE, "w", encoding="utf-8") as out:
        for idx, prompt in enumerate(prompts):
            out.write(f"\n{'=' * 80}\n")
            out.write(f"[Prompt {idx+1}]: {prompt}\n")
            out.write(f"{'-' * 80}\n")

            for name, func in methods.items():
                try:
                    result = func(prompt)
                    out.write(f"\n>> {name}:\n{result}\n")
                except Exception as e:
                    out.write(f"\n>> {name} FAILED: {e}\n")

if __name__ == "__main__":
    main()