In [75]:
"""
    Idea is to start with one token, compute tokens that maximize probability and retain top-k, then
    for each resulting sequence add one token and continue recursively. Kind of like beam-search
"""
import json
import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
import torch as ch
from torch.nn import functional as F
import numpy as np
from tqdm import tqdm
from typing import List

In [None]:
def load_targets(path: str = "./data/dev/targets_test.json"):
    with open(path, 'r') as f:
        data = json.load(f)
    return data

In [284]:
def is_ascii(s):
    return s.isascii() and s.isprintable()

In [13]:
tokenizer = AutoTokenizer.from_pretrained("TDC2023/trojan-base-pythia-1.4b", padding_side='left')
tokenizer.add_special_tokens({'pad_token': '<|endoftext|>'})
model = AutoModelForCausalLM.from_pretrained("TDC2023/trojan-base-pythia-1.4b", torch_dtype=ch.float16).cuda().eval()

In [285]:
all_tokens = list(tokenizer.get_vocab().keys())
# Filter out special tokens
all_tokens = [x for x in all_tokens if is_ascii(x) and x not in [tokenizer.bos_token, tokenizer.eos_token, tokenizer.unk_token, tokenizer.pad_token]]
# Only consider ascii-printable

In [293]:
# Also keep actual trojan data
actual_trojans = load_targets("./data/dev/base/trojan_specifications_train_dev_base.json")

In [287]:
x = tokenizer.encode(load_targets()[0])

In [309]:
@ch.no_grad()
def calculate_perplexity(input_tokens: List[int], output_tokens: List[int]):
    """
        Given inputs, compute probability of generating specific output
    """
    scores = 0.0
    tokens_extra = []
    for o in output_tokens:
        input_tokens_send = ch.Tensor(input_tokens + tokens_extra).long().unsqueeze(0).cuda()
        output = model(input_tokens_send)
        # Look at logits of specific token
        scores += output.logits[0, 0, o]
        # Pretend this was indeed generated
        tokens_extra.append(o)

    perplexity = ch.exp(scores / len(output_tokens)).item()

    return perplexity

In [360]:
def beam_search_helper(seq_so_far: List[int], target_seq: List[int], n_pick: int, top_k: int):
    random_picked = np.random.randint(0, len(all_tokens), n_pick)
    ppls = []
    for i in random_picked:
        seq_new = seq_so_far + [i]
        # Make sure this sequence has same length as target
        ppls.append(calculate_perplexity(seq_new, target_seq))
    
    # Pick top K candidates, and their scores
    wanted = np.argsort(ppls)[:top_k]
    scores = np.array(ppls)[wanted]
    
    # Return said sequences
    return [seq_so_far + [random_picked[i]] for i in wanted], scores

In [379]:
def beam_search(target_seq: List[int]):
    candidates, scores = [[]], [np.inf]
    # Everything is between 5 and 40 tokens long
    max_length = 5 #40
    min_length = 5
    n_pick= 10 # 50
    top_k = 5 # 10
    candidates_at_any_point = 15
    
    for i in tqdm(range(max_length)):
        # Run for each candidate
        c_new, s_new = [], []
        for cand in candidates:
            # Use large set for start
            if i == 0:
                s, c = beam_search_helper(cand, target_seq, 200, top_k)
            else:
                s, c = beam_search_helper(cand, target_seq, n_pick, top_k)
            c_new.extend(s)
            s_new.extend(c)

        # Add to pool
        candidates += c_new
        scores += s_new

        # Keep only top candidates_at_an_point candidates
        best_indices = np.argsort(scores)[:candidates_at_any_point]
        candidates = [x for i, x in enumerate(candidates) if i in best_indices]
        scores = [x for i, x in enumerate(scores) if i in best_indices]

    """
    # Prune out candidates that have length < min_length
    c_kept, s_kept = [], []
    for c, s in zip(candidates, scores):
        if len(c) >= min_length:
            c_cekt.append(c)
            s_kept.append(s)
    """
    
    s_kept, c_kept = np.array(scores), candidates
    
    # Return top 20 candidates
    keep = np.argsort(s_kept)[:20]
    
    texts = [tokenizer.decode(x) for i, x in enumerate(c_kept) if i in keep]
    scores = s_kept[keep]
    
    return texts, scores

In [380]:
t, s = beam_search(x)

  0%|                                                                                                                                                                                           | 0/5 [01:22<?, ?it/s]


KeyboardInterrupt: 

In [378]:
test = "\n        ounText"
inputs = tokenizer(test, return_tensors="pt").input_ids.cuda()
generation_output = model.generate(input_ids=inputs)
print(tokenizer.decode(generation_output.cpu()[0]))

The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.



        ounText.setText(text);
        ounText.setTypeface(Typeface
