<a href="https://colab.research.google.com/github/Kuper994/TML-project/blob/main/TML_Project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Env Setup

In [1]:
# !pip install -U torchtext==0.18.0


In [2]:
from google.colab import drive
import sys
import os

drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/Trutworthy-ML/Project')
sys.path.append(os.path.abspath('/content/drive/MyDrive/Trutworthy-ML/Project'))

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import transformers
import random
import pickle
import copy
from tqdm import tqdm
import numpy as np


device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

## Model llama

In [4]:
from transformers.pipelines.text_generation import ReturnType, Chat
from typing import Union, Sequence, Any


class OurPipeline(transformers.pipelines.TextGenerationPipeline):
  def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

  def _forward(self, model_inputs, **generate_kwargs):
        input_ids = model_inputs["input_ids"]
        attention_mask = model_inputs.get("attention_mask", None)
        # Allow empty prompts
        if input_ids.shape[1] == 0:
            input_ids = None
            attention_mask = None
            in_b = 1
        else:
            in_b = input_ids.shape[0]
        prompt_text = model_inputs.pop("prompt_text")

        # If there is a prefix, we may need to adjust the generation length. Do so without permanently modifying
        # generate_kwargs, as some of the parameterization may come from the initialization of the pipeline.
        prefix_length = generate_kwargs.pop("prefix_length", 0)
        if prefix_length > 0:
            has_max_new_tokens = "max_new_tokens" in generate_kwargs or (
                "generation_config" in generate_kwargs
                and generate_kwargs["generation_config"].max_new_tokens is not None
            )
            if not has_max_new_tokens:
                generate_kwargs["max_length"] = generate_kwargs.get("max_length") or self.model.config.max_length
                generate_kwargs["max_length"] += prefix_length
            has_min_new_tokens = "min_new_tokens" in generate_kwargs or (
                "generation_config" in generate_kwargs
                and generate_kwargs["generation_config"].min_new_tokens is not None
            )
            if not has_min_new_tokens and "min_length" in generate_kwargs:
                generate_kwargs["min_length"] += prefix_length

        # BS x SL
        generated_sequence = self.model.generate(input_ids=input_ids, attention_mask=attention_mask, output_scores=True, return_dict_in_generate=True, **generate_kwargs)
        # print(generated_sequence)
        # forward_res = self.model(input_ids=input_ids, attention_mask=attention_mask)
        # out_b = generated_sequence.shape[0]
        # if self.framework == "pt":
        #     generated_sequence = generated_sequence.reshape(in_b, out_b // in_b, *generated_sequence.shape[1:])
        # elif self.framework == "tf":
        #     pass
        return {"generated_sequence": generated_sequence, "input_ids": input_ids, "prompt_text": prompt_text}  #, "forward_res": forward_res}

  def forward(self, model_inputs, **forward_params):
        with self.device_placement():
            if self.framework == "tf":
                model_inputs["training"] = False
                model_outputs = self._forward(model_inputs, **forward_params)
            elif self.framework == "pt":
                inference_context = self.get_inference_context()
                with inference_context():
                    model_inputs = self._ensure_tensor_on_device(model_inputs, device=self.device)
                    model_outputs = self._forward(model_inputs, **forward_params)
                    model_outputs = self._ensure_tensor_on_device(model_outputs, device=torch.device("cpu"))
            else:
                raise ValueError(f"Framework {self.framework} is not supported")
        return model_outputs

  def postprocess(self, model_outputs, return_type=ReturnType.FULL_TEXT, clean_up_tokenization_spaces=True):
        generated_sequence = model_outputs["generated_sequence"][0]
        input_ids = model_outputs["input_ids"]
        prompt_text = model_outputs["prompt_text"]
        generated_sequence = generated_sequence.numpy().tolist()
        records = []
        for sequence in generated_sequence:
            if return_type == ReturnType.TENSORS:
                record = {"generated_token_ids": sequence}
            elif return_type in {ReturnType.NEW_TEXT, ReturnType.FULL_TEXT}:
                # Decode text
                text = self.tokenizer.decode(
                    sequence,
                    skip_special_tokens=True,
                    clean_up_tokenization_spaces=clean_up_tokenization_spaces,
                )

                # Remove PADDING prompt of the sequence if XLNet or Transfo-XL model is used
                if input_ids is None:
                    prompt_length = 0
                else:
                    prompt_length = len(
                        self.tokenizer.decode(
                            input_ids[0],
                            skip_special_tokens=True,
                            clean_up_tokenization_spaces=clean_up_tokenization_spaces,
                        )
                    )

                all_text = text[prompt_length:]
                if return_type == ReturnType.FULL_TEXT:
                    if isinstance(prompt_text, str):
                        all_text = prompt_text + all_text
                    elif isinstance(prompt_text, Chat):
                        # Explicit list parsing is necessary for parsing chat datasets
                        all_text = list(prompt_text.messages) + [{"role": "assistant", "content": all_text}]

                record = {"generated_text": all_text}
            records.append(record)
        return model_outputs, records



In [None]:
prefix = "The recent advances in computational biology are"
access_token = ""
model = "meta-llama/Llama-2-7b-chat-hf"
# # model = "meta-llama/Meta-Llama-3-8B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model, token=access_token)

model = AutoModelForCausalLM.from_pretrained(
    model,
    token=access_token
 )



Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [None]:

pipeline = transformers.pipeline(
"text-generation",
model=model,
tokenizer=tokenizer,
torch_dtype=torch.float16,
device=device,
pipeline_class=OurPipeline,
)

In [None]:
target = tokenizer.encode("NLP models have been increasingly applied in the field of biology")


In [None]:
def fitness(sequence, pipeline, target_tokens, tokenizer, min_value=-10e2, max_length=300):
  sequences = pipeline(
                          sequence,
                          do_sample=True,
                          top_k=10,
                          num_return_sequences=1,
                          eos_token_id=tokenizer.eos_token_id,
                          max_length=max_length,
                        )
  scores = sequences[0]['generated_sequence']['scores']
  answer = tokenizer.decode(sequences[0]["generated_sequence"]["sequences"][0])
  best_fit = -np.inf

  for w in range(max_length):
    if w + len(target_tokens) <= len(scores):

      fit = [scores[w+i].squeeze()[token] for i,token in enumerate(target_tokens)]
      fit = list(map(lambda x: min_value if x == -np.inf else x, fit))
      sum_fit = sum(fit)

      if sum_fit > best_fit:
        best_fit = sum_fit
        sent =  [torch.argmax(scores[w+i].squeeze()) for i,token in enumerate(target_tokens)]
        #print(tokenizer.decode(sent))

  return best_fit

print(fitness("Hello",pipeline,target,tokenizer))
print(fitness("how is the pm of israel",pipeline,target,tokenizer))
print(fitness("what is comuptitonal biology",pipeline,target,tokenizer))
print(fitness("is NLP had been applied to biology",pipeline,target,tokenizer))


In [None]:
# all_words = list(tokenizer.vocab.keys())
# prompts = []
# for _ in range(200):
#   prompts.append(' '.join([w for w in random.sample(dictionary, 10)]))

# print("prompt:", prompt)
sequences = pipeline(
'How NLP models been applied in the field of biology',
do_sample=True,
top_k=10,
num_return_sequences=1,
eos_token_id=tokenizer.eos_token_id,
max_length=400,
)

# for seq in sequences:
#   print(f"{seq['generated_text']}")

Truncation was not explicitly activated but `max_length` is provided a specific value, please use `truncation=True` to explicitly truncate examples to max length. Defaulting to 'longest_first' truncation strategy. If you encode pairs of sequences (GLUE-style) with the tokenizer you can select this strategy more precisely by providing a specific strategy to `truncation`.


## genetic algorithm

In [24]:
def create_initial_population(vocabulary, prompt_len, population_size: int = 200):
    population = []
    for _ in range(population_size - 1):
        population.append(random.sample(vocabulary, prompt_len))
    return population

def fitness(sentence, pipeline, tokenizer):
  return 1

def find_maximal_prompt(pipeline, tokenizer, prompt_len: int = 7, population_size: int = 200, num_iterations: int = 150):
    vocab = list(tokenizer.vocab.keys())
    population = create_initial_population(vocab, prompt_len=prompt_len, population_size=population_size)

    for _ in tqdm(range(num_iterations)):
        new_population = copy.deepcopy(population)

        # create 50 crossovers
        for _ in range(population_size):
            sample1, sample2 = random.sample(population, 2)
            pivot_idx = random.choice(range(prompt_len))
            new_population.append(sample1[:pivot_idx] + sample2[pivot_idx:])
            new_population.append(sample2[:pivot_idx] + sample1[pivot_idx:])

        # create 50 mutations
        for _ in range(population_size):
            sample = random.choice(population)
            num_muts = random.randint(0, prompt_len)
            if not num_muts:
                continue
            mut_idx = random.sample(range(prompt_len), num_muts)
            mut_sample = copy.deepcopy(sample)
            for i_ in mut_idx:
              mut_sample[i_] = random.choice(vocab)
            if mut_sample not in new_population:
                new_population.append(mut_sample)
        population = sorted(
            new_population, reverse=True, key=lambda s: fitness(s, pipeline, tokenizer))[: population_size]

        current_solution = population[:5]
    return population[:5]


find_maximal_prompt(pipeline, tokenizer)

100%|██████████| 150/150 [00:00<00:00, 179.63it/s]


[['▁calculus', 'Es', 'oslav', '∀', '▁Wes', 'ña', 'icio'],
 ['<0xDB>', '▁Orts', 'ต', '▁impro', 'format', 'ertain', '▁life'],
 ['▁convenience', '▁gelang', 'Name', '▁bat', 'ە', '▁diag', '▁policy'],
 ['▁express', 'té', 'het', '.,', '▁significance', '▁powershell', '▁сло'],
 ['ira', 'gender', 'iot', '▁сельсов', 'buf', '▁réseau', 'om']]