In [None]:
from transformers.tokenization_utils_base import PreTrainedTokenizerBase
from transformers.models.auto.tokenization_auto import AutoTokenizer
import torch

In [None]:
testin = ["hello world!","second input:","third input:"]
testout = ["hi!!!!!!!!","yes youre second wow so great","now finished"]
tokenizer = AutoTokenizer.from_pretrained("/home/nova/cs336/assignment5-alignment/models/Qwen2.5-Math-1.5B")

In [None]:
def get_mask_tensor(io_len:list[tuple[int,int]])->torch.Tensor:
    """
    根据已知的io长度制造mask，只显露出模型输出的部分，mask掉输入和尾部padding
    """
    max_len = max([i + o for (i,o) in io_len])
    res = []
    for ilen, olen in io_len:
        #print("i,o:",ilen,olen)
        imask = [0] * ilen
        omask = [1] * olen
        padmask = [0] * (max_len - ilen - olen)
        res.append(imask + omask + padmask)
    rest = torch.tensor(res)
    return rest

In [None]:
def tokenize_prompt_and_output(
        prompt_strs:list[str],
        output_strs:list[str], 
        tokenizer:PreTrainedTokenizerBase ):
    """
    Tokenize the prompt and output strings, and construct a mask that is 1 for the response tokens and 0 for other tokens (prompt or padding).
    Args:
        prompt_strs(list[str]): List of prompt strings.
        output_strs(list[str]): List of output strings.
        tokenizer(PreTrainedTokenizer): Tokenizer to use for tokenization.
    Returns:
        output(dict[str, torch.Tensor]): Let prompt_and_output_lens be a list containing the lengths of the tokenized prompt and output strings. Then the returned dictionary should have the following keys.
        - input_ids: torch.Tensor of shape (batch_size, max(prompt_and_output_lens) - 1): the tokenized prompt and output strings, with the final token sliced off.
        - labels: torch.Tensor of shape (batch_size, max(prompt_and_output_lens) - 1): shifted input ids, i.e., the input ids without the first token.
        - response_mask: torch.Tensor of shape (batch_size, max(prompt_and_output_lens) - 1): a mask on the response tokens in the labels.
    """
    # print("输入:",prompt_strs)
    # print("输出:",output_strs)

    assert len(prompt_strs) == len(output_strs) , "输入与输出数量不等！"
    
    prompt_ids = [tokenizer.encode(s) for s in prompt_strs]
    #print(prompt_ids)

    response_ids = [tokenizer.encode(s) for s in output_strs]
    #print(response_ids)
    
    batch_ids = [p + r for p,r in zip(prompt_ids, response_ids)]
    #print(batch_ids)

    io_len = [(len(lp),len(lr)) for lp, lr in zip(prompt_ids, response_ids)]
    #print(io_len)

    max_len = max([len(io) for io in batch_ids])
    #print("最长的序列长度：",max_len)

    padding_id = tokenizer.pad_token_id
    #print("padding id:",padding_id)
    #print(tokenizer.decode(padding_id))

    for io in batch_ids:
        l = len(io)
        pad = [padding_id for _ in range(max_len - l)]
        io += pad

    #print(batch_ids)

    



    padded_batch_tensor = torch.tensor(batch_ids)
    #print(padded_batch_tensor)

    mask_tensor = get_mask_tensor(io_len)
    #print(mask_tensor)

    res =  {
        "input_ids":padded_batch_tensor[:,:-1],
        "labels":padded_batch_tensor[:,1:],
        "response_mask":mask_tensor[:,1:]
    }

    return res

tokenize_prompt_and_output(testin,testout,tokenizer)

### Problem (compute_entropy): Per-token entropy (1 point)

**Deliverable:** Implement a method `compute_entropy` that computes the per-token entropy of next-token predictions. The following interface is recommended:

```python
def compute_entropy(logits: torch.Tensor) -> torch.Tensor:
```
> Get the entropy of the next-token predictions (i.e., entropy over the vocabulary dimension).

- **Args:**
    - `logits: torch.Tensor`: Tensor of shape `(batch_size, sequence_length, vocab_size)` containing unnormalized logits.
- **Returns:**
    - `torch.Tensor`: Shape `(batch_size, sequence_length)`. The entropy for each next-token prediction.

**Note:** you should use a numerically stable method (e.g., using `logsumexp`) to avoid overflow.

To test your code, implement `adapters.run_compute_entropy`. Then run `uv run pytest -k test_compute_entropy` and ensure your implementation passes.


In [None]:
import einops
from jaxtyping import Float, Int, Bool
import torch

def compute_entropy(
    logits: Float[torch.Tensor,"batch_size sequence_length vocab_size"]
) -> Float[torch.Tensor,"batch_size sequence_length"]:
    """
    Get the entropy of the next-token predictions (i.e., entropy over the vocabulary dimension).
    Args:
        logits(torch.Tensor): Tensor of shape `(batch_size, sequence_length, vocab_size)` containing unnormalized logits.
    Returns:
        output(torch.Tensor): Shape `(batch_size, sequence_length)`. The entropy for each next-token prediction.
    """
    # 每个位置取对数，乘以自己的相反数，然后沿最后一维求和
    #print("输入：",logits)
    max_logits = einops.reduce(logits,"b s v -> b s 1","max")   # b s 1
    #print(max_logits)
    logits -= max_logits                                        # b s v
    
    exp = torch.exp(logits)                                     # b s v
    sumexp = einops.reduce(exp,"b s v -> b s 1","sum")          # b s 1
    prob = exp / sumexp                                         # b s v

    logsumexp = torch.log(sumexp)                               # b s 1
    logprob = logits - logsumexp                                # b s v

    entropy_contrib = einops.einsum(-prob,logprob,"b s v, b s v -> b s v")
    entropy = einops.reduce(entropy_contrib,"b s v -> b s","sum")

    return entropy


a = torch.tensor([[[1,2],[3,4]],
                  [[2,3],[4,5]]])
compute_entropy(a)

In [None]:
a = torch.tensor([1,2,3])
a

In [None]:
- torch.log(a)

In [None]:
import numpy as np

data = np.load("/home/nova/cs336/assignment5-alignment/tests/_snapshots/test_compute_entropy.npz")
print(data.files)  

for key in data.files:
    print(f"{key}: {data[key].shape}")
    print(data[key])

In [None]:
torch.manual_seed(42)
inputs = torch.randn(size=(2,10,100))
compute_entropy(inputs)

In [None]:
test = torch.randn(size=(1,2,3))
print(test)
compute_entropy(test)

### Problem (get_response_log_probs): Response log-probs (and entropy) (2 points)

**Deliverable:** Implement a method `get_response_log_probs` that gets per-token conditional log-probabilities (given the previous tokens) from a causal language model, and optionally the entropy of the model’s next-token distribution. The following interface is recommended:

```python
def get_response_log_probs(
    model: PreTrainedModel,
    input_ids: torch.Tensor,
    labels: torch.Tensor,
    return_token_entropy: bool = False,
) -> dict[str, torch.Tensor]:
```

- **Args:**
    - `model: PreTrainedModel`: HuggingFace model used for scoring (placed on the correct device and in inference mode if gradients should not be computed).
    - `input_ids: torch.Tensor`: shape `(batch_size, sequence_length)`, concatenated prompt + response tokens as produced by your tokenization method.
    - `labels: torch.Tensor`: shape `(batch_size, sequence_length)`, labels as produced by your tokenization method.
    - `return_token_entropy: bool`: If `True`, also return per-token entropy by calling `compute_entropy`.
- **Returns:**
    - `dict[str, torch.Tensor]`:
        - `"log_probs"`: shape `(batch_size, sequence_length)`, conditional log-probabilities `log pθ(xt | x<t)`.
        - `"token_entropy"`: optional, shape `(batch_size, sequence_length)`, per-token entropy for each position (present only if `return_token_entropy=True`).

**Implementation tips:**
- Obtain logits with `model(input_ids).logits`.

To test your code, implement `adapters.run_get_response_log_probs`. Then run `uv run pytest -k test_get_response_log_probs` and ensure the test passes.

In [2]:
%load_ext autoreload
%autoreload 2

In [3]:
from cs336_alignment.mysft import *
from transformers.models.auto.modeling_auto import AutoModelForCausalLM
from transformers.models.auto.tokenization_auto import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("/home/nova/cs336/assignment5-alignment/models/Qwen2.5-Math-1.5B")
model = AutoModelForCausalLM.from_pretrained("/home/nova/cs336/assignment5-alignment/models/Qwen2.5-Math-1.5B")


Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


In [4]:

prompts = ["Question: 2+2=? Answer:"]
responses = [" 4"]

tok = tokenize_prompt_and_output(prompts, responses, tokenizer)
input_ids = tok["input_ids"]
labels = tok["labels"]

print(tok)

{'input_ids': tensor([[14582,    25,   220,    17,    10,    17, 19884, 21806,    25,   220]]), 'labels': tensor([[   25,   220,    17,    10,    17, 19884, 21806,    25,   220,    19]]), 'response_mask': tensor([[0, 0, 0, 0, 0, 0, 0, 0, 1, 1]])}


In [12]:
from cs336_alignment.mysft import *
with torch.no_grad():
    get_response_log_probs(model, input_ids, labels, return_token_entropy=True)

tensor([[[ 7.9035, 11.9540, 11.8372,  ..., -2.5852, -2.5849, -2.5850],
         [ 2.5860,  4.3740,  3.6240,  ..., -4.1906, -4.1907, -4.1910],
         [-1.2723, -1.3299,  0.0963,  ..., -2.0321, -2.0321, -2.0322],
         ...,
         [11.0863,  4.5606,  9.9343,  ..., -2.4296, -2.4293, -2.4298],
         [ 1.5735,  0.9405,  0.1129,  ..., -4.3587, -4.3588, -4.3593],
         [ 0.3099, -2.6036, -0.9576,  ..., -1.6830, -1.6830, -1.6833]]])
torch.Size([1, 10, 151936])
tensor([[   25,   220,    17,    10,    17, 19884, 21806,    25,   220,    19]])
torch.Size([1, 10])
torch.Size([1, 10])


In [None]:
P = torch.randint(0, 5, [2,3,4])
print(P)
I = torch.tensor([[1, 0, 2],
                  [2, 2, 1]])

R = torch.gather(P, dim=2, index=I.unsqueeze(-1)).squeeze(-1)
print(R)

In [18]:
import math
logits = torch.tensor([[[1,2,3]]])
id = torch.tensor([[0]])
prob = compute_prob_given_id(logits,id)
print(prob)

tensor([[0.0900]])


In [19]:
(math.e - 1)/(math.e ** 3 - 1)

0.09003057317038046

### SFT microbatch train step

The loss we minimize in SFT is the negative log-likelihood of the target output given the prompt. To compute this loss, we need to compute the log-probabilities of the target output given the prompt and sum over all tokens in the output, masking the tokens in the prompt and padding tokens.

We will implement a helper function for this, that we will also make use of later during RL.

### Problem (masked_normalize): Masked normalize (1 point)

**Deliverable:** Implement a method `masked_normalize` that sums over tensor elements and normalizes by a constant while respecting a boolean mask. The following interface is recommended:

```python
def masked_normalize(
    tensor: torch.Tensor,
    mask: torch.Tensor,
    normalize_constant: float,
    dim: int | None = None,
) -> torch.Tensor:
```
> Sum over a dimension and normalize by a constant, considering only those elements where `mask == 1`.

- **Args:**
    - `tensor: torch.Tensor`: The tensor to sum and normalize.
    - `mask: torch.Tensor`: Same shape as `tensor`; positions with `1` are included in the sum.
    - `normalize_constant: float`: the constant to divide by for normalization.
    - `dim: int | None`: the dimension to sum along before normalization. If `None`, sum over all dimensions.
- **Returns:**
    - `torch.Tensor`: the normalized sum, where masked elements (`mask == 0`) don’t contribute to the sum.

To test your code, implement `adapters.run_masked_normalize`. Then run `uv run pytest -k test_masked_normalize` and ensure it passes.

In [2]:
%load_ext autoreload
%autoreload 2


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [8]:
from cs336_alignment.mysft import *
tensor = torch.tensor([[1,2,3],[4,5,6]])
mask = torch.tensor([[1,0,1],[1,1,0]])
c = 1.0
dim = 0

masked_normalize(tensor,mask,c,dim)

hello
tensor([[1, 0, 3],
        [4, 5, 0]])


tensor([[5., 5., 3.]])