In [1]:
# Dec 27 2023 work

from transformers import GPT2Tokenizer, GPT2LMHeadModel
import torch
from typing import List
import numpy as np
import os

os.environ["http_proxy"]="http://127.0.0.1:7890"
os.environ["https_proxy"]="http://127.0.0.1:7890"

Some Questions:

1. the generation of prompts appears to be very random, which might not be applicable to code snippets

2. CodeT5 performance under some traditional code obfuscation methods

In [9]:
# a demo of _get_generation_cache function

class LanguageModel:
    def __init__(self):
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.model = GPT2LMHeadModel.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token  # <|endoftext|> (id:50256)
        self.device = torch.device('cuda:1' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)

    def _get_generation_cache(self, source_texts: List[str], past_key_values=None):
        # ["Hello, world!", "How are you doing?"]
        token_encoding = (self.tokenizer(source_texts,
                                         padding=True,
                                         truncation=True,
                                         return_tensors='pt')
                          .to(self.device))
        # {'input_ids': tensor([[15496,11,995,0,50256],[2437,389,345,1804,30]], device='cuda:1'),
        #  'attention_mask': tensor([[1,1,1,1,0],[1,1,1,1,1]], device='cuda:1')}
        input_ids = token_encoding['input_ids']  # 2*5, note that there's a padding 50256
        input_lengths = token_encoding['attention_mask'].sum(dim=1)  # sum up horizontally
        outputs = self.model.transformer(input_ids,
                                         past_key_values=past_key_values,  # intermediate outputs saved
                                         use_cache=True)
        # Fields: last_hidden_state (torch.Size([2, 5, 768])) past_key_values (<class 'tuple'>)
        last_token_hidden_state = \
            outputs.last_hidden_state[np.arange(input_ids.shape[0]),
                                      (input_lengths - 1)]  # torch.Size([2, 768])
        past_key_values = outputs.past_key_values
        return last_token_hidden_state, past_key_values

    def main(self):
        source_texts = ["Hello, world!", "How are you doing?"]
        self._get_generation_cache(source_texts)


if __name__ == "__main__":
    lm = LanguageModel()
    lm.main()

In [None]:
# a demo of text generation process

class GenerationModel:
    def __init__(self):
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.model = GPT2LMHeadModel.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)

    def _get_generation_cache(self, source_texts: List[str], past_key_values=None):
        # ["Hello, world!", "How are you doing?"]
        token_encoding = (self.tokenizer(source_texts,
                                         padding=True,
                                         truncation=True,
                                         return_tensors='pt')
                          .to(self.device))
        # {'input_ids': tensor([[15496,11,995,0,50256],[2437,389,345,1804,30]], device='cuda:1'),
        #  'attention_mask': tensor([[1,1,1,1,0],[1,1,1,1,1]], device='cuda:1')}
        input_ids = token_encoding['input_ids']  # 2*5, note that there's a padding 50256
        input_lengths = token_encoding['attention_mask'].sum(dim=1)  # sum up horizontally
        outputs = self.model.transformer(input_ids,
                                         past_key_values=past_key_values,  # intermediate outputs saved
                                         use_cache=True)
        # Fields: last_hidden_state (torch.Size([2, 5, 768])) past_key_values (<class 'tuple'>)
        last_token_hidden_state = \
            outputs.last_hidden_state[np.arange(input_ids.shape[0]),
                                      (input_lengths - 1)]  # torch.Size([2, 768])
        past_key_values = outputs.past_key_values
        return last_token_hidden_state, past_key_values

    def generate_text(self, source_texts, max_new_tokens=5):
        generated_texts = source_texts.copy()  # Copy the source texts
        past_key_values = None  # Initialize past_key_values

        for _ in range(max_new_tokens):
            # Process all texts in the batch
            state, past_key_values = self._get_generation_cache(generated_texts, past_key_values)

            # Generate next tokens for each text in the batch
            next_tokens = []
            # state.shape: torch.Size([2, 768])
            for idx, state_per_text in enumerate(state):
                # state_per_text.shape: torch.Size([768])
                logits = state_per_text.unsqueeze(0)  # Add batch dimension
                # logits.shape: torch.Size([1, 768])
                # Pass the logits through the LM head to get predictions for the entire vocabulary
                logits = self.model.lm_head(logits)  # torch.Size([1, 50257])
                next_token_id = torch.argmax(logits, dim=-1)
                next_token = self.tokenizer.decode(next_token_id)
                next_tokens.append(next_token)

            # Append the generated tokens to the respective texts
            generated_texts = [text + token for text, token in zip(generated_texts, next_tokens)]

        return generated_texts

    def main(self):
        source_texts = ["Hello, Daniel Lu!", "I love computer science so much..."]
        generated_texts = self.generate_text(source_texts, max_new_tokens=5)
        for text in generated_texts:
            print(text)


if __name__ == "__main__":
    lm = GenerationModel()
    lm.main()

In [None]:
# a demo of top-k sampling approach

class SamplingModel:
    def __init__(self):
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.model = GPT2LMHeadModel.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.model.to(self.device)

    def _get_generation_cache(self, source_texts: List[str], past_key_values=None):
        # ["Hello, world!", "How are you doing?"]
        token_encoding = (self.tokenizer(source_texts,
                                         padding=True,
                                         truncation=True,
                                         return_tensors='pt')
                          .to(self.device))
        # {'input_ids': tensor([[15496,11,995,0,50256],[2437,389,345,1804,30]], device='cuda:1'),
        #  'attention_mask': tensor([[1,1,1,1,0],[1,1,1,1,1]], device='cuda:1')}
        input_ids = token_encoding['input_ids']  # 2*5, note that there's a padding 50256
        input_lengths = token_encoding['attention_mask'].sum(dim=1)  # sum up horizontally
        outputs = self.model.transformer(input_ids,
                                         past_key_values=past_key_values,  # intermediate outputs saved
                                         use_cache=True)
        # Fields: last_hidden_state (torch.Size([2, 5, 768])) past_key_values (<class 'tuple'>)
        last_token_hidden_state = \
            outputs.last_hidden_state[np.arange(input_ids.shape[0]),
                                      (input_lengths - 1)]  # torch.Size([2, 768])
        past_key_values = outputs.past_key_values
        return last_token_hidden_state, past_key_values

    def top_k_logits(self, logits, k):
        if k == 0:
            return logits  # Keep all logits
        else:
            # Remove all logits not in the top k
            values, _ = torch.topk(logits, k)
            min_values = values[:, -1].unsqueeze(1).expand_as(logits)
            return torch.where(logits < min_values, torch.full_like(logits, float('-inf')), logits)

    def generate_text(self, source_texts, max_new_tokens=5, top_k=50):
        generated_texts = source_texts.copy()  # Copy the source texts
        past_key_values = None  # Initialize past_key_values

        for _ in range(max_new_tokens):
            # Process all texts in the batch
            state, past_key_values = self._get_generation_cache(generated_texts, past_key_values)

            # Generate next tokens for each text in the batch
            next_tokens = []
            for idx, state_per_text in enumerate(state):
                logits = state_per_text.unsqueeze(0)  # Add batch dimension
                logits = self.model.lm_head(logits)  # Get logits for the entire vocabulary

                # Apply top-k filtering
                filtered_logits = self.top_k_logits(logits, top_k)

                # Sample from the filtered distribution
                probabilities = torch.nn.functional.softmax(filtered_logits, dim=-1)  # torch.Size([1, 50257])
                # the likelihood of each element being selected is proportional to its probability in the distribution
                next_token_id = torch.multinomial(probabilities, 1).item()
                next_token = self.tokenizer.decode(next_token_id)

                next_tokens.append(next_token)

            # Append the generated tokens to the respective texts
            generated_texts = [text + token for text, token in zip(generated_texts, next_tokens)]

        return generated_texts

    def main(self):
        source_texts = ["Hello, Daniel Lu!", "I love computer science so much..."]
        generated_texts = self.generate_text(source_texts, max_new_tokens=5)
        for text in generated_texts:
            print(text)


if __name__ == "__main__":
    lm = SamplingModel()
    lm.main()

Ideas:

1. do more CodeT5 experiments on name/data/flow obfuscation -> find flaws

2. null prompt discussed -> focus on finding better variable names

3. does 'space'/'return' matter -> more dramatic changes

4. do more literature investigation -> how to process codes