# Granular control over model outputs
- So far, we only have a set of arguments to work with.
- Execute the model directly to have control over logits.


In [1]:
import torch
from torch import bfloat16
import transformers

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_instruct_id = "mistralai/Mistral-7B-Instruct-v0.2"
model = transformers.AutoModelForCausalLM.from_pretrained(
    model_instruct_id,
    trust_remote_code=True,
    torch_dtype=bfloat16,
    device_map="auto"
).to(device)
tokenizer = transformers.AutoTokenizer.from_pretrained(model_instruct_id)

model.eval()

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

MistralForCausalLM(
  (model): MistralModel(
    (embed_tokens): Embedding(32000, 4096)
    (layers): ModuleList(
      (0-31): 32 x MistralDecoderLayer(
        (self_attn): MistralAttention(
          (q_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (k_proj): Linear(in_features=4096, out_features=1024, bias=False)
          (v_proj): Linear(in_features=4096, out_features=1024, bias=False)
          (o_proj): Linear(in_features=4096, out_features=4096, bias=False)
          (rotary_emb): MistralRotaryEmbedding()
        )
        (mlp): MistralMLP(
          (gate_proj): Linear(in_features=4096, out_features=14336, bias=False)
          (up_proj): Linear(in_features=4096, out_features=14336, bias=False)
          (down_proj): Linear(in_features=14336, out_features=4096, bias=False)
          (act_fn): SiLU()
        )
        (input_layernorm): MistralRMSNorm()
        (post_attention_layernorm): MistralRMSNorm()
      )
    )
    (norm): MistralRMSNorm()
  

- obtain raw logits output by the final layer of the decoder

In [3]:
input_text = "The future of AI is"
input_tokenized = tokenizer(input_text, return_tensors="pt")
print("input_tokenized: ", input_tokenized)
input_tokenized = {k: v.to(device) for k, v in input_tokenized.items()}  # Move input tensors to the same device

with torch.no_grad():  # Prevents backpropagation
    output = model(**input_tokenized)
print("output: ", output)

input_tokenized:  {'input_ids': tensor([[    1,   415,  3437,   302, 16107,   349]]), 'attention_mask': tensor([[1, 1, 1, 1, 1, 1]])}
output:  CausalLMOutputWithPast(loss=None, logits=tensor([[[-5.5625, -5.5938, -0.2520,  ..., -4.1875, -3.2656, -3.9062],
         [-8.0625, -8.5000, -3.5469,  ..., -5.6562, -5.1250, -5.0938],
         [-8.0000, -8.3750, -4.0000,  ..., -6.0312, -5.9688, -6.8125],
         [-6.7500, -7.0000, -4.1562,  ..., -5.6250, -4.2812, -5.4062],
         [-7.9688, -8.7500, -3.5625,  ..., -6.2188, -8.7500, -6.7812],
         [-7.5938, -7.6562, -2.9688,  ..., -6.7188, -5.3750, -4.9375]]],
       device='cuda:0'), past_key_values=((tensor([[[[-9.4727e-02,  2.3071e-02,  2.3438e-01,  ..., -1.3438e+00,
           -1.9844e+00, -2.1562e+00],
          [ 5.5000e+00, -4.0000e+00, -2.6250e+00,  ...,  2.0938e+00,
            5.7031e-01, -3.1641e-01],
          [ 3.4062e+00, -4.9688e+00, -9.1797e-01,  ...,  2.3906e+00,
            1.2969e+00,  6.7188e-01],
          [-4.8125e+00, 

In [4]:
import numpy as np
np.array(output.logits.to("cpu")).shape

(1, 6, 32000)

### You can in evaluate the other output logits to determine if a text is generated
- Hypothetical scenario
  - Testing if a student is cheating on an essay
  - The student used "CheatGPT" with temperature of 0 (always selects highest probability token)
  - Student submits "Fast red car"
  - Teacher teacher observes the logits for the essay as provided by "CheatGPT".
  - CheatGPT_vocabulary = ["Fast", "red", "car", "blue", "EOS"]
  - logits_of_text = [[0, .6, 0, .4, 0, 0], [0, 0, 1, 0, 0], [0, 0, 0, 0, 1]]
  - rebuild_from_logits_using_argmax = ["Fast", "red", "car", "EOS"]

- In short in the presence of variability AI detection is unreliable.
- https://gptzero.me/
  - The OG AI detection
  - depends on randomness of text.


## Experiment controlling output with semantic similarity
<img src="control_by_semantic_similarity.JPG"
     alt="Markdown Monster icon"
     style="float: left; margin-right: 10px;" />

In [5]:
from scipy.spatial.distance import cosine


embedder_id="sentence-transformers/all-mpnet-base-v2"
embedding_tokenizer = transformers.AutoTokenizer.from_pretrained(embedder_id)
embedding_tokenizer.model_max_length = 512
embedding_model = transformers.AutoModel.from_pretrained(embedder_id)

def _get_sentence_embedding(sentence):
    inputs = embedding_tokenizer(sentence, return_tensors='pt', padding=True, truncation=True)
    with torch.no_grad():
        outputs = embedding_model(**inputs)

    # Mean Pooling - Take attention mask into account for correct averaging
    attention_mask = inputs['attention_mask']
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(outputs.last_hidden_state.size()).float()
    sum_embeddings = torch.sum(outputs.last_hidden_state * input_mask_expanded, 1)
    sum_mask = torch.clamp(input_mask_expanded.sum(1), min=1e-9)
    mean_pooled = sum_embeddings / sum_mask

    return mean_pooled[0].numpy()

def sentence_similarity(sent1, sent2):
    embedding1 = _get_sentence_embedding(sent1) if type(sent1) == str else sent1
    embedding2 = _get_sentence_embedding(sent2) if type(sent2) == str else sent2
    return 1 - cosine(embedding1, embedding2)

  return self.fget.__get__(instance, owner)()


In [6]:
def semantically_similar_generation(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight=3, amt=10):
    """
    input_text: str, the input text to be continued
    semantic_comparison: str, the sentence to compare the generated outputs to
    instruct_model: transformers model, the model to be used for generation
    instruct_tokenizer: transformers tokenizer, the tokenizer to be used for generation
    device: torch.device, the device to be used for computation
    sc_weight: int, the weight to be used for the semantic similarity score
    amt: int, the amount of top tokens to consider

    This will generate a continuation of the input_text and return the top token that has the highest score unless the eos token is the top token.    
    """
    input_tokenized = instruct_tokenizer(input_text, return_tensors="pt")
    input_ids = input_tokenized["input_ids"].to(device)

    with torch.no_grad(): # prevents backpropagation thus preventing updating the model
        output = instruct_model(input_ids=input_ids) # the output logits of a sequence of tokens
    last_token_logits = output.logits[0, -1, :]
    probabilities = torch.nn.functional.softmax(last_token_logits)
    top_prob, top_indices = torch.topk(probabilities, amt)
    top = [ {"prob": top_prob[i], "idx": top_indices[i]} for i in range(len(top_indices))]
    is_eos = any(i["idx"] == instruct_tokenizer.eos_token_id for i in top)
    if is_eos:
        print("FOUND EOS: ", is_eos)
        return {"idx": instruct_tokenizer.eos_token_id, "prob": 1.0, "semantic_similarity": 1.0, "score": 1.0}

    for idx, i in enumerate(top):
        if (idx%10 == 0):
            print("Step: " + str(idx))
        similarity_score = sentence_similarity(instruct_tokenizer.decode(i["idx"]), semantic_comparison)
        
        similarity_score_tensor = torch.tensor(similarity_score).to(device)
        i["semantic_similarity"] = similarity_score_tensor
        i["score"] = i["prob"] * i["semantic_similarity"]**sc_weight

    argmax = max(top, key=lambda x: x["score"])
    return argmax

def ss_full_instruct_generation(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight=3, amt=10, tab=""):
    """
    input_text: str, the input text to be continued
    semantic_comparison: str, the sentence to compare the generated outputs to
    instruct_model: transformers model, the model to be used for generation
    instruct_tokenizer: transformers tokenizer, the tokenizer to be used for generation
    device: torch.device, the device to be used for computation
    sc_weight: int, the weight to be used for the semantic similarity score
    amt: int, the amount of top tokens to consider

    This will recursively generate a instruction using semantically_similar_generation until the eos token is the top token. 
    """
    gen = semantically_similar_generation(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight, amt)
    print("Generation: " + tab + instruct_tokenizer.decode(gen["idx"]) + " > Sc: " + str(round(float(gen["score"]))) + " | Pr: " + str(round(float(gen["prob"]))) + " | SS: " + str(round(float(gen["semantic_similarity"]))))
    if instruct_tokenizer.decode(gen["idx"]) == instruct_tokenizer.eos_token:
        return input_text + instruct_tokenizer.decode(gen["idx"])
    else:
        return ss_full_instruct_generation(input_text + instruct_tokenizer.decode(gen["idx"]), semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight, amt, tab + "  ")

In [7]:
np.array([[1,2,3]])*np.array([[1,2,3]])

array([[1, 4, 9]])

In [9]:
# last_token_logits = generation_semantic_comparison_mapper(input_text, semantic_comparison, model, tokenizer, device, sc_weight=3, amt=10)
# similarities_uf_tensor = torch.tensor(similarities_uf).to(device)
# similarities_uf_tensor = similarities_uf_tensor.view(1, -1)
# similarities_uf_tensor = similarities_uf_tensor**2
# last_token_logits = last_token_logits.view(1, -1)
# product_logit_sc = last_token_logits * similarities_uf_tensor

In [10]:
import json
with open("similarities_uf.json", "r") as f:
    similarities_uf = json.load(f)
    
def generation_semantic_comparison_mapper(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight=3, amt=10, v=False):
    """
    input_text: str, the input text to be continued
    semantic_comparison: str, the sentence to compare the generated outputs to
    instruct_model: transformers model, the model to be used for generation
    instruct_tokenizer: transformers tokenizer, the tokenizer to be used for generation
    device: torch.device, the device to be used for computation
    sc_weight: int, the weight to be used for the semantic similarity score
    amt: int, the amount of top tokens to consider

    This will generate a continuation of the input_text and return the top token that has the highest score unless the eos token is the top token.    
    """
    input_tokenized = instruct_tokenizer(input_text, return_tensors="pt")
    input_ids = input_tokenized["input_ids"].to(device)

    with torch.no_grad(): # prevents backpropagation thus preventing updating the model
        output = instruct_model(input_ids=input_ids) # the output logits of a sequence of tokens
    last_token_logits = output.logits[0, -1, :]
    # get the products of each of the last_token_logits and similarities_uf
    similarities_uf_tensor = torch.tensor(similarities_uf).to(device)
    similarities_uf_tensor = similarities_uf_tensor.view(1, -1)
    similarities_uf_tensor = similarities_uf_tensor**sc_weight
    last_token_logits = last_token_logits.view(1, -1)
    # softmax the last_token_logits
    last_token_logits = torch.nn.functional.softmax(last_token_logits)
    product_logit_sc = last_token_logits * similarities_uf_tensor
    # get the max of the last_token_logits
    argmax = torch.argmax(product_logit_sc[0])

    if v:
        print("_Florida predicts: ", last_token_logits[0, 9500])
        print("▁California predicts: ", last_token_logits[0, 6247])
        print("_Michigan predicts: ", last_token_logits[0, 13642])
        print("product _Florida: ", product_logit_sc[0, 9500])
        print("product ▁California: ", product_logit_sc[0, 6247])
        print("product _Michigan: ", product_logit_sc[0, 13642])
        print("argmax: ", argmax)

    return argmax
def ss_comparison_full_instruct_generation(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight=3, amt=10, tab="", v=False):
    """
    input_text: str, the input text to be continued
    semantic_comparison: str, the sentence to compare the generated outputs to
    instruct_model: transformers model, the model to be used for generation
    instruct_tokenizer: transformers tokenizer, the tokenizer to be used for generation
    device: torch.device, the device to be used for computation
    sc_weight: int, the weight to be used for the semantic similarity score
    amt: int, the amount of top tokens to consider

    This will recursively generate a instruction using semantically_similar_generation until the eos token is the top token. 
    """
    if len(tab) > 40:
        return input_text
    gen = generation_semantic_comparison_mapper(input_text, semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight, amt)
    if v:
        print("Generation: " + tab + instruct_tokenizer.decode(gen) + " > Sc: ")
    if instruct_tokenizer.decode(gen) == instruct_tokenizer.eos_token:
        return input_text + instruct_tokenizer.decode(gen)
    else:
        return ss_comparison_full_instruct_generation(input_text + instruct_tokenizer.decode(gen), semantic_comparison, instruct_model, instruct_tokenizer, device, sc_weight, amt, tab + "  ", v)

In [11]:
input_text = "<s>[INST] Select the university of your choice, only respond the name nothing else: University of ___ [/INST]"

In [12]:
print(tokenizer.tokenize("Michigan"))
print(tokenizer.convert_tokens_to_ids("▁Florida"))
print(tokenizer.convert_tokens_to_ids("▁California"))
print(tokenizer.convert_tokens_to_ids("▁Michigan"))
# get index of '▁Florida'

['▁Michigan']
9500
6247
13642


In [13]:
print(similarities_uf[9500]**100)
print(similarities_uf[6247]**100)
print(similarities_uf[13642]**100)


5.96397131910073e-29
5.698700188093311e-63
1.900214646659092e-41


In [14]:
generate_text = transformers.pipeline(
    model=model,
    tokenizer=tokenizer,
    task="text-generation",
    temperature=0.01,  
    top_k=1,  
    max_new_tokens=512, 
    repetition_penalty=1.1 
)
generated_text = generate_text(input_text)
print("Generated text: ", generated_text)

Setting `pad_token_id` to `eos_token_id`:2 for open-end generation.


Generated text:  [{'generated_text': '<s>[INST] Select the university of your choice, only respond the name nothing else: University of ___ [/INST] California, Berkeley'}]


In [32]:
tokenizer.decode(0)

'<unk>'

In [35]:
semantic_comparison = "University of Florida"
generated_instruction = ss_comparison_full_instruct_generation(input_text, semantic_comparison, model, tokenizer, device, sc_weight=200, amt=1, tab="", v=False)
print(generated_instruction)

  last_token_logits = torch.nn.functional.softmax(last_token_logits)


<s>[INST] Select the university of your choice, only respond the name nothing else: University of ___ [/INST]ufufufufufufufufufufufufufufufufufufufufuf


In [31]:
semantic_comparison = "University of Florida"
generated_instruction = ss_comparison_full_instruct_generation(input_text, semantic_comparison, model, tokenizer, device, sc_weight=1000, amt=15, tab="", v=False)
print(generated_instruction)

  last_token_logits = torch.nn.functional.softmax(last_token_logits)


<s>[INST] Select the university of your choice, only respond the name nothing else: University of ___ [/INST]<unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk>


In [31]:
semantic_comparison = "University of Florida"
generated_instruction = ss_comparison_full_instruct_generation(input_text, semantic_comparison, model, tokenizer, device, sc_weight=1000, amt=15, tab="", v=False)
print(generated_instruction)

  last_token_logits = torch.nn.functional.softmax(last_token_logits)


<s>[INST] Select the university of your choice, only respond the name nothing else: University of ___ [/INST]<unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk><unk>


In [16]:
tokenizer.decode(10474)

'unc'

In [17]:
# semantic_comparison = "University of Florida"
# import json
# def full_vocab_semantic_similarity(semantic_comparison, instruct_tokenizer):
#     """
#     input_text: str, the input text to be continued
#     semantic_comparison: str, the sentence to compare the generated outputs to
#     instruct_model: transformers model, the model to be used for generation
#     instruct_tokenizer: transformers tokenizer, the tokenizer to be used for generation
#     device: torch.device, the device to be used for computation
#     sc_weight: int, the weight to be used for the semantic similarity score
#     amt: int, the amount of top tokens to consider

#     This will generate a continuation of the input_text and return the top token that has the highest score unless the eos token is the top token.    
#     """
#     similarities = []
#     for i in range(instruct_tokenizer.vocab_size):
#         if (i%50 == 0):
#             print("Step: ", i)
#             # save to similarities.json
#             with open("similarities.json", "w") as f:
#                 f.write(json.dumps(similarities))
#         similarities.append(sentence_similarity(instruct_tokenizer.decode(i), semantic_comparison))
        
#     with open("similarities.json", "w") as f:
#         f.write(json.dumps(similarities))
# semantic_comparison_embedding = _get_sentence_embedding(semantic_comparison)
# full_vocab_semantic_similarity(semantic_comparison_embedding, tokenizer)