In [None]:
#
# For licensing see accompanying LICENSE file.
# Copyright (C) 2025 Apple Inc. All Rights Reserved.
#

In [1]:
import pandas as pd
import json
import os
import numpy as np
import nltk.data
import sys
sys.path.append("../")
sys.path.append("../../")
sys.path.insert(0, os.path.abspath('..'))

import torch
import torch as t
import pickle
from sklearn.metrics import roc_auc_score

import nltk
nltk.download('punkt_tab')

## Helper functions

In [None]:
def readjsonl(datapath):
    res = []
    with open(datapath, "r", encoding="utf-8") as f:
        for line in f.readlines():
            res.append(json.loads(line))
    return res

# // Get all detailed instructions
def get_inst_list(task_path_ifeval="ifeval_simple"):
    task_path_ifeval="../data/"+task_path_ifeval+".jsonl"
    ifeval_eval_df = pd.DataFrame(readjsonl(task_path_ifeval))
    instruction_id_list = ifeval_eval_df['instruction_id_list']
    inst_list=[]
    for i in instruction_id_list:
        for j in i:
            if j not in inst_list:
                inst_list.append(j)
    return inst_list

# // Get all high level instructions
def get_high_inst_list(task_path_ifeval="ifeval_simple"):
    task_path_ifeval="../data/"+task_path_ifeval+".jsonl"
    ifeval_eval_df = pd.DataFrame(readjsonl(task_path_ifeval))
    instruction_id_list = ifeval_eval_df['instruction_id_list']
    inst_list=[]
    for i in instruction_id_list:
        for j in i:
            j = j.split(':')[0]
            if j not in inst_list:
                inst_list.append(j)
    return inst_list

# // Get all task type
def get_task_list(task_path_ifeval="ifeval_simple"):
    task_path_ifeval="../data/"+task_path_ifeval+".jsonl"
    ifeval_eval_df = pd.DataFrame(readjsonl(task_path_ifeval))
    prompt_df = ifeval_eval_df['prompt']
    task_list=[]
    tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')

    for prompt in prompt_df:
        task = tokenizer.tokenize(prompt)[0]
        if task not in task_list:
            task_list.append(task)
    return task_list

get_inst_list(), get_high_inst_list(), get_task_list(), len(get_task_list())

# Linear Probes

In [6]:
class DataModuleActIfevalSimple:
    def __init__(self,
                 ifeval_eval_path,
                 inst_list, 
                 task_list,
                 layer=13, 
                 target_token='last',
                 center=True,
                 scale=False,
                 ):
        self.layer=layer

        # // Load data
        ifeval_data_path = "../data/ifeval_simple.jsonl" 
        self.ifeval_data = self.load_response_df(ifeval_data_path)
        ifeval_eval_df = self.load_response_df(os.path.join(ifeval_eval_path, 'eval_results_loose.jsonl')) # // <-- Please run the IFEval code for generating this file https://huggingface.co/datasets/google/IFEval
        

        # // Select index by inst
        inst_ind = []
        for i in range(len(ifeval_eval_df)):
            if ifeval_eval_df.iloc[i]['instruction_id_list'][0] in inst_list:
                inst_ind.append(i)

        # // Select index by task
        task_ind = []
        tokenizer = nltk.data.load('tokenizers/punkt/english.pickle')
        for i in range(len(ifeval_eval_df)):
            prompt = ifeval_eval_df.iloc[i]['prompt']
            task = tokenizer.tokenize(prompt)[0]
            if task in task_list:
                task_ind.append(i)

        # // Select index intersection
        select_ind = list(set(inst_ind) & set(task_ind))

        # // Load acts and labels
        self.labels = torch.tensor(ifeval_eval_df['follow_all_instructions'])[select_ind]
        self.labels = self.labels.float()
        self.acts = self.collect_acts(ifeval_eval_path, layer=self.layer, target_token=target_token, device='cuda', center=center, scale=scale, index_list=select_ind)
        self.acts = self.acts.float()
        self.data={}
        self.data = self.acts, self.labels
        print('Saved layers: ', self.saved_layers)

    def load_response_df(self, task_path, type='loose'):
        response_df = pd.DataFrame(self.readjsonl(task_path))
        return response_df
    
    def readjsonl(self, datapath):
        res = []
        with open(datapath, "r", encoding="utf-8") as f:
            for line in f.readlines():
                res.append(json.loads(line))
        return res
    
    def load_pickle(self, filename: str):
        with open(filename, "rb") as f:
            return pickle.load(f)

    def collect_acts(self, task_path, layer=13, target_token='last', device='cuda', center=True, scale=False, index_list=None):
        """
        Collects activations from a dataset of statements, returns as a tensor of shape [n_activations, activation_dimension].
        First token: [1, len_input, hidden_emb]
        Last token: [1, 1, hidden_emb]
        """
        act_path = os.path.join(task_path, "activations")
        _num_act = len(os.listdir(act_path))
        acts = []
        print('num_act: ', _num_act)
        for _idx in range(_num_act):
            if index_list is not None and _idx in index_list:
                act_file_name = os.path.join(act_path, f"sample_{_idx}.pkl")
                act = self.load_pickle(act_file_name)       
                self.saved_layers = act[f'output_token_{target_token}'].keys()
                act = act[f'output_token_{target_token}'][f'layer_{layer}']
                act = act[:,-1] # <-- last of the first token, no problem for last token --> [1, hidden_emb]
                acts.append(act)
        acts = torch.cat(acts, dim=0).to(device)
        if center:
            acts = acts - torch.mean(acts, dim=0)
        if scale:
            acts = acts / torch.std(acts, dim=0)
        return acts


            
        

In [7]:

class LRProbe(t.nn.Module):
    def __init__(self, d_in, binary_threshold=0.5, **kwargs):
        super().__init__()
        self.net = t.nn.Sequential(
            t.nn.Linear(d_in, 1, bias=False),
            t.nn.Sigmoid()
        )
        self.binary_threshold = binary_threshold

    def forward(self, x, iid=None):
        return self.net(x).squeeze(-1)

    def pred(self, x, iid=None, binary_threshold=None):
        binary_threshold = binary_threshold if binary_threshold is not None else self.binary_threshold
        return (self(x)>binary_threshold).float()
    
    def probability(self, x, iid=None):
        return self(x)
    
    def from_data(acts, labels, lr=0.001, weight_decay=0.1, epochs=1000, device='cpu', class_weight_one=None, **kwargs):
        acts, labels = acts.to(device), labels.to(device)
        probe = LRProbe(acts.shape[-1]).to(device)
        
        opt = t.optim.AdamW(probe.parameters(), lr=lr, weight_decay=weight_decay)
        for _ in range(epochs):
            opt.zero_grad()
            if class_weight_one is not None:
                class_weight = torch.ones_like(labels)
                class_weight[labels>0] = class_weight_one
                loss = t.nn.BCELoss(weight=class_weight)(probe(acts), labels)
            else:
                loss = t.nn.BCELoss()(probe(acts), labels)
            loss.backward()
            opt.step()
        
        return probe

    @property
    def direction(self):
        return self.net[0].weight.data[0]


# Task generalization

In [None]:
LAYER=14
MODEL='Llama-2-7b-chat-hf'
TOKEN='first'

task_path_ifeval = f"./data/{MODEL}/ifeval_simple"

# // Seed
roc_list=[]
m_roc_list=[]
seed_list = np.random.randint(0, 10000, 5)
for seed in seed_list:
    print(seed)

    # // Select train and test task
    task_list = np.array(get_task_list())
    torch.manual_seed(seed)
    split=0.8
    train_ind_list = torch.randperm(len(task_list)) < int(split * len(task_list))
    test_ind_list = ~train_ind_list
    train_task_list = task_list[train_ind_list]
    test_task_list = task_list[test_ind_list]

    # // Use all instructions
    inst_list = get_inst_list()

    # // Get train data
    train_dm = DataModuleActIfevalSimple(task_path_ifeval, inst_list, train_task_list, layer=LAYER, target_token=TOKEN, center=True, scale=True)
    test_dm = DataModuleActIfevalSimple(task_path_ifeval, inst_list, test_task_list, layer=LAYER, target_token=TOKEN, center=True, scale=True)
    train_acts, train_labels = train_dm.data
    test_acts, test_labels = test_dm.data

    # // Scale and Center
    all_acts = torch.cat((train_acts, test_acts))
    print(all_acts.shape)
    train_acts = train_acts - torch.mean(train_acts, dim=0)
    train_acts = train_acts / torch.std(train_acts, dim=0)
    test_acts = test_acts - torch.mean(train_acts, dim=0)
    test_acts = test_acts / torch.std(train_acts, dim=0)

    # // Stat of test
    succ = (test_labels==1).sum()
    fail = (test_labels==0).sum()
    print('succ: ', succ)
    print('fail: ', fail)


    # // Train probe
    max_roc=0
    probe = LRProbe.from_data(train_acts, train_labels, device='cuda', epochs=1000, binary_threshold=0.5)

    # // Test
    test_prob = probe.probability(test_acts).detach().cpu()
    auroc = roc_auc_score(test_labels, test_prob)
    roc_list.append(auroc)

    print(LRProbe, ': ', auroc)
    print()
    


# Intruction generalization

In [None]:
LAYER=14
MODEL='Llama-2-7b-chat-hf'
TOKEN='first'

task_path_ifeval = f"../data/{MODEL}/ifeval_simple"

# // Make a dict for result
inst_list = np.array(get_inst_list())
re={}
all_label={}
all_pred={}
for i in inst_list:
    if i not in re.keys():
        re[i]=[]
        all_label[i]=[]
        all_pred[i]=[]
roc_list=[]
total_pred=[]
total_label=[]

# // Use all task
task_list = get_task_list()

# // Select train and test inst
inst_list = np.array(get_inst_list())

keyword_list = [\
    'keywords:frequency',
    'keywords:forbidden_words',
    'keywords:existence',
    'detectable_content:number_placeholders',
    "startend:end_checker"
    ]

final={}
for inst in inst_list:
    final[inst]=[]

for inst in inst_list:

    # // Leave one out
    train_inst_list = [i for i in keyword_list if i != inst]
    test_inst_list = [inst]
    print(train_inst_list)
    print(test_inst_list)

    # // Get train data
    train_dm = DataModuleActIfevalSimple(task_path_ifeval, train_inst_list, task_list, layer=LAYER, target_token=TOKEN, center=True, scale=True)
    test_dm = DataModuleActIfevalSimple(task_path_ifeval, test_inst_list, task_list, layer=LAYER, target_token=TOKEN, center=True, scale=True)
    train_acts, train_labels = train_dm.data
    test_acts, test_labels = test_dm.data

    # // Scale and Center
    all_acts = torch.cat((train_acts, test_acts))
    print(all_acts.shape)
    train_acts = train_acts - torch.mean(train_acts, dim=0)
    train_acts = train_acts / torch.std(train_acts, dim=0)
    test_acts = test_acts - torch.mean(train_acts, dim=0)
    test_acts = test_acts / torch.std(train_acts, dim=0)

    # // Stat of test
    succ = (test_labels==1).sum()
    fail = (test_labels==0).sum()
    print('te_succ: ', succ)
    print('te_fail: ', fail)

    # // Stat of train
    tr_succ = (train_labels==1).sum()
    tr_fail = (train_labels==0).sum()
    print('tr_succ: ', tr_succ)
    print('tr_fail: ', tr_fail)
    tr_class_weight = tr_succ/tr_fail

    # // exception
    if succ<1 or fail<1:
        continue


    # // Train probe
    probe = LRProbe.from_data(train_acts, train_labels, device='cuda', epochs=1000, binary_threshold=0.5, class_weight_one=None)

    # // Test
    test_prob = probe.probability(test_acts).detach().cpu()
    auroc = roc_auc_score(test_labels, test_prob)

    print(LRProbe, ': ', auroc)
    print()
    
    # // save
    roc_list.append(auroc)
    re[inst].append(auroc)
    all_label[inst].append(test_labels)
    all_pred[inst].append(test_prob)
    total_label.append(test_labels)
    total_pred.append(test_prob)

for key in all_pred.keys():
    if len(all_pred[key])>0:
        print(key)
        label = np.concatenate(all_label[key])
        pred = np.concatenate(all_pred[key])
        final[key].append(roc_auc_score(label, pred ))

# // Compute all auc total
label = np.concatenate(total_label)
pred = np.concatenate(total_pred)
total_auroc = roc_auc_score(label, pred)
    
