**1. Import and Install**

In [None]:
!pip install -U peft transformers accelerate



In [None]:
import os
import math
import random
from dataclasses import dataclass
from typing import Dict, List, Any

import torch
from torch.utils.data import Dataset

from transformers import (
    AutoTokenizer,
    AutoModelForCausalLM,
    Trainer,
    TrainingArguments,
    set_seed,
)

from peft import (
    PromptTuningConfig,
    PromptTuningInit,
    TaskType,
    get_peft_model,
)


**2. Repro / device**

In [None]:
set_seed(42)
device = "cuda" if torch.cuda.is_available() else "cpu"


**3. Build a non-trivial dataset(multi-format instruction tuning)**

In [None]:
TASKS = [
    {
        "task": "format_to_json",
        "instruction": "Convert the user info into strict JSON with keys: name, age, city.",
        "examples": [
            ("Name: Alice; Age: 24; City: Seattle", '{"name":"Alice","age":24,"city":"Seattle"}'),
            ("Name: Bob; Age: 31; City: Austin", '{"name":"Bob","age":31,"city":"Austin"}'),
        ],
    },
    {
        "task": "classify_sentiment",
        "instruction": "Classify sentiment as one of: positive, negative, neutral. Output only the label.",
        "examples": [
            ("I love this phone, battery lasts forever.", "positive"),
            ("This update broke everything and I’m furious.", "negative"),
            ("It arrived yesterday.", "neutral"),
        ],
    },
    {
        "task": "extract_fields",
        "instruction": "Extract email fields. Output as: TO=<...> | SUBJECT=<...> | ACTION=<...>.",
        "examples": [
            ("Email: To John, Subject: Meeting, Please reschedule to Friday.", "TO=John | SUBJECT=Meeting | ACTION=reschedule to Friday"),
            ("Email: To HR, Subject: PTO, I want to request 2 days off next week.", "TO=HR | SUBJECT=PTO | ACTION=request 2 days off next week"),
        ],
    },
]

In [None]:
def make_synthetic_samples(n: int = 300) -> List[Dict[str, str]]:
    """Create a mixed-task dataset so the prompt vector learns broader steering."""
    samples = []
    for _ in range(n):
        t = random.choice(TASKS)
        inp, out = random.choice(t["examples"])

        # Add small perturbations to make it harder than pure memorization
        if t["task"] == "format_to_json" and random.random() < 0.3:
            inp = inp.replace(";", ",").replace("Age:", "age:")
        if t["task"] == "classify_sentiment" and random.random() < 0.3:
            inp = inp + " " + random.choice(["Honestly.", "FYI.", "No joke."])
        if t["task"] == "extract_fields" and random.random() < 0.3:
            inp = inp.replace("Email:", "Message:")

        samples.append({
            "task": t["task"],
            "instruction": t["instruction"],
            "input": inp,
            "output": out,
        })
    return samples

In [None]:
ALL = make_synthetic_samples(1000)
random.shuffle(ALL)
split = int(0.9 * len(ALL))
TRAIN = ALL[:split]
EVAL = ALL[split:]


**4. Tokenizer + formatting**

In [None]:
model_name = "microsoft/phi-2"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# GPT-2 has no pad_token by default; set it safely
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
def format_example(ex: Dict[str, str]) -> str:
    """
    Instruction-style prompt that the tuned virtual tokens will "steer".
    We keep a consistent template (important for soft prompt learning).
    """
    return (
        "### System\n"
        "You are a precise assistant. Follow the user instruction exactly.\n\n"
        "### Instruction\n"
        f"{ex['instruction']}\n\n"
        "### Input\n"
        f"{ex['input']}\n\n"
        "### Output\n"
        f"{ex['output']}"
    )

In [None]:
def format_prompt_for_inference(instruction: str, user_input: str) -> str:
    """Same template but WITHOUT the ground-truth output."""
    return (
        "### System\n"
        "You are a precise assistant. Follow the user instruction exactly.\n\n"
        "### Instruction\n"
        f"{instruction}\n\n"
        "### Input\n"
        f"{user_input}\n\n"
        "### Output\n"
    )

In [None]:
class MixedInstructionDataset(Dataset):
    def __init__(self, rows: List[Dict[str, str]], max_length: int = 256):
        self.rows = rows
        self.max_length = max_length

    def __len__(self):
        return len(self.rows)

    def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:
      ex = self.rows[idx]

      # Build prompt WITHOUT the ground-truth answer
      prompt = (
          "### System\n"
          "You are a precise assistant. Follow the user instruction exactly.\n\n"
          "### Instruction\n"
          f"{ex['instruction']}\n\n"
          "### Input\n"
          f"{ex['input']}\n\n"
          "### Response\n"
      )

      # Ground-truth answer (keep it clean, no extra headers)
      answer = ex["output"].strip()

      # Tokenize prompt and full text separately
      prompt_enc = tokenizer(
          prompt,
          truncation=True,
          max_length=self.max_length,
          padding=False,
          return_tensors="pt",
          add_special_tokens=False,
      )
      full_enc = tokenizer(
          prompt + answer,
          truncation=True,
          max_length=self.max_length,
          padding=False,
          return_tensors="pt",
          add_special_tokens=False,
      )

      input_ids = full_enc["input_ids"].squeeze(0)
      attention_mask = full_enc["attention_mask"].squeeze(0)

      # Labels: mask out prompt tokens so loss only applies to answer
      labels = input_ids.clone()
      prompt_len = prompt_enc["input_ids"].size(1)
      labels[:prompt_len] = -100

      return {
          "input_ids": input_ids,
          "attention_mask": attention_mask,
          "labels": labels,
      }


In [None]:
@dataclass
class CausalLMCollator:
    pad_token_id: int
    num_virtual_tokens: int
    label_pad_id: int = -100

    def __call__(self, features):
        input_ids = [f["input_ids"] for f in features]
        attention_mask = [f["attention_mask"] for f in features]
        labels = [f["labels"] for f in features]

        # IMPORTANT: account for virtual tokens added by prompt tuning
        if self.num_virtual_tokens > 0:
            vt = self.num_virtual_tokens
            labels = [
                torch.cat([torch.full((vt,), self.label_pad_id, dtype=l.dtype), l], dim=0)
                for l in labels
            ]
            attention_mask = [
                torch.cat([torch.ones(vt, dtype=a.dtype), a], dim=0)
                for a in attention_mask
            ]

        max_len = max(x.size(0) for x in labels)  # labels now define the true length

        def pad_1d(x, pad_value):
            pad_len = max_len - x.size(0)
            if pad_len <= 0:
                return x
            return torch.cat([x, torch.full((pad_len,), pad_value, dtype=x.dtype)], dim=0)

        # input_ids SHOULD remain the original length; we pad it to match max_len by left-padding?
        # For GPT-2, safest is RIGHT-pad, but we must align with labels/attention_mask length.
        input_ids = torch.stack([pad_1d(x, self.pad_token_id) for x in input_ids])
        attention_mask = torch.stack([pad_1d(x, 0) for x in attention_mask])
        labels = torch.stack([pad_1d(x, self.label_pad_id) for x in labels])

        return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels}



**5.  Base model + Prompt Tuning (PEFT)**

In [None]:

# -----------------------------
# Base model (phi-2)
# -----------------------------
base_model = AutoModelForCausalLM.from_pretrained(
    model_name,
    trust_remote_code=True,        # REQUIRED for phi-2
    torch_dtype=torch.float16,
    device_map="auto"
)

# Make sure pad token is defined at model level
base_model.config.pad_token_id = tokenizer.pad_token_id

# -----------------------------
# Prompt tuning config (FIXED)
# -----------------------------
peft_config = PromptTuningConfig(
    task_type=TaskType.CAUSAL_LM,

    # TEXT init is good, but keep it SHORT
    prompt_tuning_init=PromptTuningInit.TEXT,
    prompt_tuning_init_text=(
        "Follow the instruction. "
        "Output only the final answer."
    ),

    # IMPORTANT: reduce this for phi-2
    num_virtual_tokens=8,   # ← 8 or 16 max for phi-2

    tokenizer_name_or_path=model_name,
)

# -----------------------------
# Apply PEFT
# -----------------------------
model = get_peft_model(base_model, peft_config)

# Sanity check: trainable params should be tiny
model.print_trainable_parameters()

model.to(device)

`torch_dtype` is deprecated! Use `dtype` instead!


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

trainable params: 20,480 || all params: 2,779,704,320 || trainable%: 0.0007


PeftModelForCausalLM(
  (base_model): PhiForCausalLM(
    (model): PhiModel(
      (embed_tokens): Embedding(51200, 2560)
      (layers): ModuleList(
        (0-31): 32 x PhiDecoderLayer(
          (self_attn): PhiAttention(
            (q_proj): Linear(in_features=2560, out_features=2560, bias=True)
            (k_proj): Linear(in_features=2560, out_features=2560, bias=True)
            (v_proj): Linear(in_features=2560, out_features=2560, bias=True)
            (dense): Linear(in_features=2560, out_features=2560, bias=True)
          )
          (mlp): PhiMLP(
            (activation_fn): NewGELUActivation()
            (fc1): Linear(in_features=2560, out_features=10240, bias=True)
            (fc2): Linear(in_features=10240, out_features=2560, bias=True)
          )
          (input_layernorm): LayerNorm((2560,), eps=1e-05, elementwise_affine=True)
          (resid_dropout): Dropout(p=0.1, inplace=False)
        )
      )
      (rotary_emb): PhiRotaryEmbedding()
      (embed_dropout

In [None]:

train_ds = MixedInstructionDataset(TRAIN, max_length=256)
eval_ds  = MixedInstructionDataset(EVAL,  max_length=256)
collator = CausalLMCollator(
    pad_token_id=tokenizer.pad_token_id,
    num_virtual_tokens=peft_config.num_virtual_tokens
)


**6.  Metrics (optional): perplexity on eval**

In [None]:
def compute_metrics(eval_pred):
    # Trainer gives (logits, labels) but labels contain -100 pads
    logits, labels = eval_pred
    # We compute a rough perplexity from cross-entropy ignoring -100 tokens.
    logits_t = torch.tensor(logits)
    labels_t = torch.tensor(labels)

    # shift for causal LM
    shift_logits = logits_t[:, :-1, :].contiguous()
    shift_labels = labels_t[:, 1:].contiguous()

    loss_fct = torch.nn.CrossEntropyLoss(ignore_index=-100)
    loss = loss_fct(
        shift_logits.view(-1, shift_logits.size(-1)),
        shift_labels.view(-1),
    )
    ppl = float(torch.exp(loss).clamp(max=1e4))
    return {"eval_loss_ce": float(loss), "perplexity": ppl}

**7. Train**

In [None]:
from transformers import Trainer
import torch
import torch.nn.functional as F

class PromptTuningTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, num_items_in_batch=None):
        # Forward pass (DO NOT pass labels to model to avoid default loss)
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits  # [B, S_logit, V]

        # Align labels length with logits length (pad LEFT with -100)
        if logits.size(1) != labels.size(1):
            diff = logits.size(1) - labels.size(1)
            if diff > 0:
                pad = torch.full((labels.size(0), diff), -100, device=labels.device, dtype=labels.dtype)
                labels = torch.cat([pad, labels], dim=1)
            else:
                labels = labels[:, -logits.size(1):]

        # Shift for causal LM
        shift_logits = logits[:, :-1, :].contiguous()
        shift_labels = labels[:, 1:].contiguous()

        loss = F.cross_entropy(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1),
            ignore_index=-100
        )

        return (loss, outputs) if return_outputs else loss

In [None]:
out_dir = "./prompt_tuned_gpt2"
args = TrainingArguments(
    output_dir=out_dir,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    gradient_accumulation_steps=2,
    learning_rate=2e-3,           # prompt vectors can handle higher LR
    num_train_epochs=60,
    warmup_ratio=0.06,
    weight_decay=0.0,
    logging_steps=20,
    eval_strategy="epoch",
    save_strategy="epoch",
    save_total_limit=2,
    fp16=torch.cuda.is_available(),
    report_to="none",
)

In [None]:
trainer = PromptTuningTrainer(
    model=model,
    args=args,
    train_dataset=train_ds,
    eval_dataset=eval_ds,
    data_collator=collator,
)
trainer.train()
trainer.evaluate()

# Save ONLY the prompt-tuning adapter (small)
model.save_pretrained(out_dir)
tokenizer.save_pretrained(out_dir)


The model is already on multiple devices. Skipping the move to device specified in `args`.


Epoch,Training Loss,Validation Loss
1,27.2183,11.586146
2,19.5667,8.108616
3,14.2314,5.678496
4,9.7717,3.784669
5,5.6452,1.697176
6,3.3121,1.15371
7,2.6866,0.79391
8,2.2391,0.82109
9,1.7941,0.597276
10,1.6998,0.457002


('./prompt_tuned_gpt2/tokenizer_config.json',
 './prompt_tuned_gpt2/special_tokens_map.json',
 './prompt_tuned_gpt2/vocab.json',
 './prompt_tuned_gpt2/merges.txt',
 './prompt_tuned_gpt2/added_tokens.json',
 './prompt_tuned_gpt2/tokenizer.json')

**8. Inference helper**

In [None]:
@torch.no_grad()
def generate_answer(instruction: str, user_input: str, max_new_tokens: int = 60) -> str:
    model.eval()
    prompt = format_prompt_for_inference(instruction, user_input)

    enc = tokenizer(prompt, return_tensors="pt").to(device)

    out = model.generate(
      **enc,
      max_new_tokens=60,
      do_sample=True,
      temperature=0.7,
      top_p=0.9,
      repetition_penalty=1.2,
      no_repeat_ngram_size=3,
      eos_token_id=tokenizer.eos_token_id,
      pad_token_id=tokenizer.eos_token_id,
  )

    text = tokenizer.decode(out[0], skip_special_tokens=True)

    # Return only the generated part after "### Output"
    marker = "### Output\n"
    if marker in text:
        return text.split(marker, 1)[1].strip()
    return text.strip()



**9. Quick demo tests**

In [None]:
tests = [
    ("Convert the user info into strict JSON with keys: name, age, city.",
     "Name: Chen; Age: 29; City: Boston"),
    ("Classify sentiment as one of: positive, negative, neutral. Output only the label.",
     "This is fine. Nothing special."),
    ("Extract email fields. Output as: TO=<...> | SUBJECT=<...> | ACTION=<...>.",
     "Email: To Alex, Subject: Budget, please reduce the cost by 10%."),
]

for inst, inp in tests:
    ans = generate_answer(inst, inp)
    print("\nINSTRUCTION:", inst)
    print("INPUT:", inp)
    print("MODEL OUTPUT:", ans)




INSTRUCTION: Convert the user info into strict JSON with keys: name, age, city.
INPUT: Name: Chen; Age: 29; City: Boston
MODEL OUTPUT: Chen: 494958c Chen;age: 5844584c Chen.; Chen: 492958a Chen;agge: 5844465a Chen.;
Chens: 593845c 5938458c 5938546c 59386452c
(this is the same

INSTRUCTION: Classify sentiment as one of: positive, negative, neutral. Output only the label.
INPUT: This is fine. Nothing special.
MODEL OUTPUT: Week 1:negative50week1negative50weeks1negative52week2negative49week3negative47week4negative45week5negative44week6positive54week7positive56week8positive58week9positive59week10positive61week11positive62week12positive63week13

INSTRUCTION: Extract email fields. Output as: TO=<...> | SUBJECT=<...> | ACTION=<...>.
INPUT: Email: To Alex, Subject: Budget, please reduce the cost by 10%.
MODEL OUTPUT: Email:\TOAlex@Alex. Email: Alex, Alex.JECTJECTEDalex Alex. 100=100. 200=150. 300=200. 400=250. 500=300. 600=400. 700=500. 800=600. 900=700. 1000=800.
