In [1]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
%load_ext autoreload
%autoreload 2

import sys
sys.path.append("../")
from questionnaire import LikertQuestionnaire, Questionnaire
from questionnaire import AdministerCustom
from questionnaire.utils.pct_viz import *

In [3]:
model_path = "cerebras/Cerebras-GPT-111M"

In [4]:
model = AutoModelForCausalLM.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

In [5]:
def generate_method(
    prompt,
    **kwargs
):
    tokenized_prompt = tokenizer.encode(
        prompt,
        return_tensors='pt', 
    )
    id_last_token = tokenized_prompt.shape[1]-1
    with torch.no_grad():
        output = model.generate(
            tokenized_prompt,
            output_scores=True,
            return_dict_in_generate=True,
            **kwargs,
        )
    #return output, id_last_token
    return output["scores"][id_last_token]

In [6]:
def get_tokens_ids(
    tokenizer,
    inputs:list,
    prefixes = [],
    suffixes = [],
    token_id = -1,
    check_decode:bool=False # TODO
):
    tokens_ids_dict = {
        input: list(set(
            [tokenizer.encode(input)[token_id]]+[ # keep first token
                tokenizer.encode(pre+input)[token_id] # !!! quite sketchy
                for pre in prefixes
            ]+[
                tokenizer.encode(input+suf)[token_id]
                for suf in suffixes
            ]
        ))
        for input in inputs
    }

    if check_decode:
        for k, values in tokens_ids_dict.items():
            tokens_ids_dict[k] = [
                v for v in values
                if k in tokenizer.decode(v)
            ]
    
    return tokens_ids_dict

def get_tokens_prob(
    logits,
    token_ids:list,
    normalize:bool=True,
):
    soft_m = torch.nn.functional.softmax(logits).to('cpu')[0]
    probs = {
        k: np.sum([soft_m[id] for id in ids])
        for k, ids in token_ids.items()
    }
    
    if normalize:
        tot = np.sum(list(probs.values()))
        for k in probs.keys():
            probs[k] = probs[k]/tot
    
    return probs

def output_parser(
    logits,
    choices_keys,
):
    choice_ids = get_tokens_ids(
        tokenizer,
        choices_keys,
        prefixes = [], # TODO ?
        suffixes = [], # TODO ?
        check_decode = False,
    )
    print(choice_ids)
    probs = get_tokens_prob(
        logits, choice_ids, normalize = True,
    )
    # if hard_scores:
    #     probs = [
    #         {
    #             k:int(v==max(question_p.values()))
    #             for k,v in question_p.items()
    #         }
    #         for question_p in probs
    #     ]
    return probs



In [7]:
generate_method("This is a test")

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.




tensor([[-1.8141, -2.4483, -4.2667,  ..., -6.1063, -4.9876, -2.3713]])

In [11]:
output_parser(generate_method("The first letter of the alphabet is: "), ["A", "B", "C"])

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


{'A': [32], 'B': [33], 'C': [34]}


  soft_m = torch.nn.functional.softmax(logits).to('cpu')[0]


{'A': 0.6845887, 'B': 0.29187033, 'C': 0.023540985}

In [12]:
pct_questionnaire = LikertQuestionnaire.from_json(
    "../data/pct.json",
    **{
        "prompt_template":"You can only choose one option. Respond only with the label of your answer. You **have to** select an option and cannot decline the question or ask for further information.\n{question}\n{choices}\nYour choice:"
    }
)

In [13]:
smpl_q, smpl_c, smpl_s = pct_questionnaire[:2]

smpl_questionnaire = Questionnaire(
    categories=pct_questionnaire.categories,
    questions=smpl_q,
    choices=smpl_c,
    scores=smpl_s,
    
    index_typetr="numerical",
    choice_delim=") ",
    prompt_template="You can only choose one option. Respond only with the label of your answer. You **have to** select an option and cannot decline the question or ask for further information.\n{question}\n{choices}\nYour choice:",
)

In [14]:
len(smpl_questionnaire.make_prompts())

2

In [15]:
administer_model = AdministerCustom(
    questionnaire=smpl_questionnaire,
    generation_method=generate_method,
    output_parser=output_parser,
    generation_args={'max_new_tokens':128},
    parser_args={},
)

In [16]:
administer_model.run()

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


{'A': [32], 'B': [33], 'C': [34], 'D': [35]}
{'A': [32], 'B': [33], 'C': [34], 'D': [35]}


  soft_m = torch.nn.functional.softmax(logits).to('cpu')[0]


{'economic': 0.3682043869048357, 'social': -0.011553947742168724}

In [17]:
administer_model.answers

[{'A': 0.22224994, 'B': 0.3890147, 'C': 0.11114146, 'D': 0.27759397},
 {'A': 0.14385116, 'B': 0.08616197, 'C': 0.04874725, 'D': 0.72123957}]