# Imports

In [102]:
import ast
import numpy as np
import time 
import re

import torch
import torch.nn.functional as F

from transformers import AutoTokenizer, AutoModel

from openai import OpenAI
import tiktoken

In [2]:
data = np.load('datasets/emoji_apple_style.npy', allow_pickle=True).item()

In [3]:
data.keys()

dict_keys(['images', 'labels', 'embeddings', 'color_palette'])

In [9]:
data = list(data)

In [10]:
# sentence embedding model
# https://huggingface.co/sentence-transformers/multi-qa-MiniLM-L6-cos-v1

# limit of 512 word pieces, trained on length of 250 word pieces and might not work for longer texts

uri_name = "sentence-transformers/multi-qa-MiniLM-L6-cos-v1"
tokenizer = AutoTokenizer.from_pretrained(uri_name)
model = AutoModel.from_pretrained(uri_name)



# Feature Generation

## Create embeddings

In [46]:
# Mean Pooling - Take average of all tokens
# see: https://huggingface.co/sentence-transformers/multi-qa-MiniLM-L6-cos-v1#pytorch-usage-huggingface-transformers
def _mean_pooling(model_output, attention_mask):
    token_embeddings = model_output.last_hidden_state
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

In [47]:
def _tokenize(texts, tokenizer, max_length):
    """
    Generate token for input text. 
    :param texts: a list of input sentences or texts to be processed
    :param tokenizer: a Hugging Face tokenizer instance
    :param max_length: an optional parameter for padding/truncation of text strings
    :return: encoded inputs 
    """
    
    padding = True if max_length == 0 else 'max_length'
    
    # __call__ the tokenizer 
    return tokenizer(
        texts, 
        padding=padding,
        truncation=True,
        max_length=max_length,  # if left unset, uses model default 
        return_tensors='pt'  # return as torch tensors 
    )    

In [48]:
def _embed(encoded_input, model):
    """
    Take tokenized values and generate embeddings
    :param encoded_input: encoded inputs generated by tokenizer 
    :param model: a Hugging Face model instance
    :return: embedding vector
    """
    
    # Compute token embeddings
    with torch.no_grad():  # only need forward pass here
        embeddings_words = model(**encoded_input, return_dict=True)

    # Perform mean pooling
    # The attention mask ensures that padding tokens do not contribute to the averaged embedding.
    # purpose is to take variable length sequences and output fixed length ones 
    embeddings_sentence = _mean_pooling(embeddings_words, encoded_input['attention_mask'])

    # Normalize embeddings -- L2 = 1
    embeddings_sentence = F.normalize(embeddings_sentence, p=2, dim=1)
    
    return embeddings_sentence, embeddings_words

In [49]:
def get_sent_word_embeddings(labels, model, tokenizer, max_length=None):
    """
    Generate sentence embedding for input texts
    :param model: Hugging Face model instance
    :param tokenizer: Hugging Face tokenizer instance
    :param labels: input labels from our training data 
    :param max_length: maximum length for padding/truncation fo input stirngs
    :return: both mean-pooled sentence embedding and masked word embeddings 
    """
    encoded = _tokenize(labels, tokenizer, max_length)
    embeddings_sentences, embeddings_words = _embed(encoded, model)

    embeddings_words = embeddings_words['last_hidden_state'].detach().cpu().numpy()
    embeddings_sentences = embeddings_sentences.detach().cpu().numpy()

    return embeddings_sentences, embeddings_words

# Augmentation

## Gaussian Noise

In [None]:
def add_noise_to_embeddings(embeddings, num_augmentations, noise_std_dev=0.01):
    """
    
    :param embeddings: input list of embeddings generated by get_sent_word_embeddings()
    :param num_augmentations: number of noise variations to add
    :param noise_std_dev: the standard deviation of multiplicative gaussian noise to add 
    :return: list of augmented embeddings and a list of their original embedding indices
    """
    augmented_embeddings, augmented_idxs = [], []
    
    # Add Gaussian noise to the embeddings
    for i, embedding in enumerate(embeddings):
        for _ in range(num_augmentations):
            # mean of 1, because this is multiplicative, since we want values that are 0 (or close) to stay 0
            noise = np.random.normal(1, noise_std_dev, embedding.shape)
            augmented_embedding = embedding * noise

            augmented_embeddings.append(augmented_embedding)
            augmented_idxs.append(i)
    
    # only return augmented values
    return augmented_embeddings, augmented_idxs

## Mixup (Random)

In [131]:
def mixup_aug(imgs, embeddings, n_mixups=1, lmbda=0.5):
    """
    Applies mixup by randomly picking another image and soft updating the image and embedding
    to produce a new sample. 
    
    :param imgs: raw pre OHT images 
    :param embeddings: corresponding embedding vectors 
    :param n_mixups: number of random observations to mix up 
    :param lmbda: soft update parameter 
    :return: augmented images, embeddings, and original idxs 
    """
    augmented_imgs, augmented_embeddings, augmented_idxs = [], [], []
    n_embeddings = len(embeddings)
    
    for i, (img, embedding) in enumerate(zip(imgs,embeddings)):
        # Randomly select n_mixups indices without replacement
        mixup_indices = np.random.choice(n_embeddings, n_mixups, replace=False)
        
        # make sure we're logging what was picked for debugging purposes 
        augmented_idxs.append([i] + mixup_indices)
        
        # for each sampled index... 
        for idx in mixup_indices:
            # Get the corresponding levels, labels, and embeddings
            mix_img = imgs[idx]
            mix_embedding = embeddings[idx]
            
            # Interpolate the levels, labels, and embeddings
            augmented_img = lmbda * img + (1 - lmbda) * mix_img
            augmented_embedding = lmbda * embedding + (1 - lmbda) * mix_embedding
            
            # Append the new data to the augmented lists
            augmented_imgs.append(augmented_img)
            augmented_embeddings.append(augmented_embedding)

    # only return the augmented values
    return augmented_imgs, augmented_embeddings, augmented_idxs

## GPT

### Load API key

In [117]:
with open('apikey.env', 'r') as file:
    # Read the content of the file
    api_key = file.read().strip() 

In [118]:
client = OpenAI(api_key=api_key)

### Estimate API token usage

In [119]:
def _compute_tokens_from_payload(payload, encoding):
    """
    Estimate the number of tokens required in the request. 
    See: https://cookbook.openai.com/examples/how_to_count_tokens_with_tiktoken
    
    :param payload: an array of dicts or an array of strings 
    :param encoding: a tiktoken encoder instance 
    :return: a count of tokens 
    """
    num_tokens = 0
    for message in payload:
        num_tokens += 4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
        
        # if we pass a wellformed payload 
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
                
            # if there's a name, the role is not processed and so doesn't count to api usage
            if key == "name":  
                num_tokens += -1 
    
    # pad the estimate with the structure of response; note: does not include actual response
    # every reply is primed with <im_start>assistant     
    num_tokens += 2  
    
    return num_tokens

In [120]:
def _compute_tokens_from_list(labels, encoding):
    """
    Estimate the number of tokens required in the request. 
    See: https://cookbook.openai.com/examples/how_to_count_tokens_with_tiktoken
    
    :param labels: a list of string labels 
    :param encoding: a tiktoken encoder instance 
    :return: a count of tokens 
    """
    return [len(encoding.encode(l)) for l in labels]


### API calls and packaging

In [121]:
def _create_payload(labels):
    prompt = """Take each string in the list provided, and write an alternate label for each one. These strings describe an image of a pixel video game map. These alternate labels should describe the same image as the original label, but use different words and a different sentence structure. Use simple or common words when writing the alternate labels. Assume you have the vocabulary of a 10 year old. Your output should have the same number of strings as the input list."""

    return [
        {"role": "system", "content": "You are a helpful assistant with excellent attention to detail. You only output python lists of strings according to the instructions you are given. Output the list on a single line, without any newlines. Make sure every list is closed properly"},
        {"role": "user", "content": f"{prompt} Here is the list of labels: {labels}"},            
    ]

In [122]:
def _call_gpt(messages, model):
    return client.chat.completions.create(model=model, messages=messages)
    # return openai.ChatCompletion.create(model=model, messages=messages)

In [123]:
# segment the data into sublists to not exceed api limits

# tiktoken is a fast open-source tokenizer by OpenAI.

# Given a text string (e.g., "tiktoken is great!") and an encoding (e.g., "cl100k_base"), a tokenizer can split the text string into a list of tokens (e.g., ["t", "ik", "token", " is", " great", "!"]).
# 
# Splitting text strings into tokens is useful because GPT models see text in the form of tokens. Knowing how many tokens are in a text string can tell you (a) whether the string is too long for a text model to process and (b) how much an OpenAI API call costs (as usage is priced by token).
def _chunk_labels(labels, encoding, threshold):
    """
    
    :param labels: feature inputs as a list of strings
    :param encoding: a tiktoken encoder instance
    :param threshold: a maximum token size to chunk the inputs into 
    :return: 
    """

    current_chunk, chunks = [], []
    count_tokens = 0
    label_tokens = _compute_tokens_from_list(labels, encoding)

    # step throw all labels 
    for label, tokens in zip(labels, label_tokens):
        print(label)
        print(tokens)
        # append labels to the current chunk and update our count
        current_chunk.append(label)
        count_tokens += tokens 
        
        # if the array exceeds the token threshold then.... 
        if count_tokens + 2 > threshold:
            
            # remove the last label 
            hold = current_chunk.pop()
            
            # complain if the label itself is so big that it's as big as the threshold
            if tokens > threshold:
                raise Exception(f"Label {label} is too big: *{tokens} tokens* for this threshold: *{threshold} tokens*")
            
            # since we removed the offending label, the current chunk should be the right size
            chunks.append(current_chunk)
            
            # start a new arr with the one we popped out and reset our counter
            current_chunk = [hold] 
            count_tokens = 0
            
    # add the final set of labels 
    chunks.append(current_chunk)
    
    return chunks

In [71]:
# newunseen_sprite = [
#     "a buff man with a blue headband and red shirt", 
#     "a blue duck with a red headband", 
#     "a green woman with blonde hair", 
#     "a dog with a black hat", 
#     "a man with blue shoes, a red hat, and a green shirt"
# ]
# model = 'gpt-4o-mini'
# encoding = tiktoken.encoding_for_model(model)
# 
# _chunk_labels(newunseen_sprite, encoding, 1)

a buff man with a blue headband and red shirt
11


Exception: Label a buff man with a blue headband and red shirt is too big: *11 tokens* for this threshold: *1 tokens*

In [124]:
def _process_result(result):
    """
    Filter out patterns that look like [' and '] 
    TODO: why don't we just look at those patterns directly instead of regex? 
    :param result: raw openAI API response 
    :return: processed answers 
    """
    answer = result.choices[0].message.content                
    apostrophe_pattern = r"(?<=\w)'(?=[^,\]])|'(?=\w+?'\s)"
    answer = re.sub(apostrophe_pattern, '', answer)
    
    idx_open = answer.find("[")
    idx_close = answer.find(']') + 1 # +1 since indexing ignores current spot 
    
    return ast.literal_eval(answer[idx_open:idx_close])

### Overall function

In [125]:
def get_gpt_alt_labels(labels, model='gpt-4o-mini', num_retries=3, debug=True):
    """
    Call GPT to generate alternate labels. 
    :param labels: list of human-annotated labels 
    :param num_retries: how many times do we try again? 
    :return: (a list of alternate labels, api call response status)
    """

    start = time.time()
    
    # it's probably clk100k_base (can pass to get_encoding()), but let's not assume
    encoding = tiktoken.encoding_for_model(model)
    prompt_size = _compute_tokens_from_payload(_create_payload([]), encoding)
    
    # chunk the prompt to the right size; if we pass in everything, it's gonna timeout/fail
    threshold = 4000 - prompt_size 
    chunks = _chunk_labels(labels, encoding, threshold=threshold)

    
    if debug:
        print(f'Prompt size: {prompt_size}')
        print(f"split time = {time.time() - start}")
        print("Number of loops: ", len(chunks))
        
    alt_labels = []
    

    for i, chunk in enumerate(chunks):
        tries = 0
        success = False
        start = time.time()
        
        if debug:
            print(f"Loop {i} running through array of size {len(labels)}")

        while not success and tries < num_retries:
            payload = _create_payload(chunk)
            result = _call_gpt(payload, model)
            alt_chunk = _process_result(result)

            n_labels = len(chunk)
            n_alts = len(alt_chunk)
            
            if n_labels == n_alts:
                success = True
            else:
                tries += 1
                
                print(f'FAILED. {n_labels} labels but {n_alts} alts!')
                print(f'attempting retry # {tries}')
                
        if success:
            alt_labels += alt_chunk
        else:
            print(f"failed completely after {num_retries} retries.")
            return
        
        if debug:
            print(f"api call time = {time.time() - start}")

    return alt_labels

### GPT Label Mixup

In [132]:
# interpolate n times between a label and its altlabel (MUST BE CALLED RIGHT AFTER GPT AUG)
def altlabel_interp_aug(embeddings, alt_embeddings, n_steps=1):
    """
    A different form of mixup where we take the original embedding and the GPT embedding and 
    generate additional embeddings in between via interpolation. 
    
    The original method randomly picked a label but that's probably not useful here? 
    
    :param embeddings: list of original embeddings 
    :param alt_embeddings: list of GPT embeddings
    :param n_steps: number interpolated samples to draw 
    :return: list of interpolated embeddings and list of original indices 
    """
    interpolated_embeddings, interpolated_idxs = [], []
    n_embeddings = len(embeddings) 
    
    for i in range(n_embeddings):
        alpha_values = np.linspace(0, 1, n_steps + 2)[1:-1]  # Exclude the 0 and 1 values
        
        for alpha in alpha_values:
            interpolated_embedding = alpha * embeddings[i] + (1 - alpha) * alt_embeddings[i] 
            interpolated_embeddings.append(interpolated_embedding)
            interpolated_idxs.append(i)

    # only return the augmented samples
    return interpolated_embeddings, interpolated_idxs

# Export

In [130]:
def export_data(inputs, labels, embeddings, filename):
    data = {
        'annotation_ids': ann_ids,
        'images': maps,
        'labels': labels,
        'embeddings': embeddings
    }
    np.save(filename, data, allow_pickle=True)

# Test GPT Augmentation

In [126]:
newunseen_sprite = [
    "a buff man with a blue headband and red shirt", 
    "a blue duck with a red headband", 
    "a green woman with blonde hair", 
    "a dog with a black hat", 
    "a man with blue shoes, a red hat, and a green shirt"
]

# # it's probably clk100k_base (can pass to get_encoding()), but let's not assume
# encoding = tiktoken.encoding_for_model(model)
# prompt_size = _compute_tokens_from_payload(_create_payload([]), encoding)
#     
# # chunk the prompt to the right size; if we pass in everything, it's gonna timeout/fail
# threshold = 4000 - prompt_size 
# chunks = _chunk_labels(newunseen_sprite, encoding, threshold=threshold)

alt_labels = get_gpt_alt_labels(newunseen_sprite, model='gpt-4o-mini', num_retries=3, debug=True)


a buff man with a blue headband and red shirt
11
a blue duck with a red headband
8
a green woman with blonde hair
6
a dog with a black hat
6
a man with blue shoes, a red hat, and a green shirt
14
Prompt size: 155
split time = 0.0015180110931396484
Number of loops:  1
Loop 0 running through array of size 5
api call time = 1.4173822402954102


In [128]:
list(zip(newunseen_sprite, alt_labels))

[('a buff man with a blue headband and red shirt',
  'a strong guy wearing a blue headband and a red top'),
 ('a blue duck with a red headband',
  'a blue duck wearing a red band around its head'),
 ('a green woman with blonde hair', 'a lady in green with yellow hair'),
 ('a dog with a black hat', 'a dog wearing a dark hat'),
 ('a man with blue shoes, a red hat, and a green shirt',
  'a man dressed in blue sneakers, a red cap, and a green top')]

In [129]:
np.save('datasets/hmei_temp_test.npy', data, allow_pickle=True)

"['a strong guy wearing a blue headband and a red shirt', 'a blue-colored duck wearing a red band', 'a woman in green with yellow hair', 'a dog wearing a black cap', 'a guy in blue shoes, a red cap, and a green top']"

['a strong guy wearing a blue headband and a red shirt', 'a blue-colored duck wearing a red band', 'a woman in green with yellow hair', 'a dog wearing a black cap', 'a guy in blue shoes, a red cap, and a green top']
['a strong guy wearing a blue headband and a red shirt', 'a blue-colored duck wearing a red band', 'a woman in green with yellow hair', 'a dog wearing a black cap', 'a guy in blue shoes, a red cap, and a green top']


['a strong guy wearing a blue headband and a red shirt',
 'a blue-colored duck wearing a red band',
 'a woman in green with yellow hair',
 'a dog wearing a black cap',
 'a guy in blue shoes, a red cap, and a green top']

ChatCompletion(id='chatcmpl-9oRgixcXuduXyurvX1BgLO80eu0rF', choices=[Choice(finish_reason='stop', index=0, logprobs=None, message=ChatCompletionMessage(content="['a strong guy wearing a blue headband and a red shirt', 'a blue-colored duck wearing a red band', 'a woman in green with yellow hair', 'a dog wearing a black cap', 'a guy in blue shoes, a red cap, and a green top']", role='assistant', function_call=None, tool_calls=None))], created=1721809860, model='gpt-4o-mini-2024-07-18', object='chat.completion', system_fingerprint='fp_661538dc1f', usage=CompletionUsage(completion_tokens=57, prompt_tokens=208, total_tokens=265))

# Tokenize Input

In [51]:
get_sent_word_embeddings(['I am a big foo bar'], model, tokenizer, None)

(array([[ 9.82737169e-02,  1.92254707e-02,  1.50002148e-02,
          2.88651371e-03, -5.50648011e-02, -3.68904248e-02,
          1.18874013e-01,  4.83325757e-02,  6.01447141e-03,
          4.21652161e-02, -3.52831781e-02, -8.36424306e-02,
         -3.50537300e-02, -1.95263885e-02,  2.79278532e-02,
          3.95715386e-02, -1.28606902e-02,  1.15014147e-02,
         -1.06786359e-02, -2.24973243e-02, -1.10274829e-01,
          6.65320083e-02,  1.88118499e-02,  1.76326197e-03,
         -1.19415283e-01, -1.26929749e-02,  4.31086402e-03,
          6.83599897e-03,  8.61172006e-02, -5.21420985e-02,
          1.39685320e-02,  7.56450966e-02,  4.12384309e-02,
         -1.83217507e-02, -8.99099484e-02, -2.86612986e-03,
          4.25359234e-02, -9.02812183e-03,  3.88544612e-02,
          4.52078208e-02,  2.26972681e-02, -9.90414154e-03,
          3.50649208e-02,  3.34182046e-02, -3.00184805e-02,
          1.63097437e-02, -2.29557529e-02,  4.05627266e-02,
          7.01322453e-04, -1.12383990e-0

In [None]:
    # # load data, filter it, and generate the embeddings
    # ann_ids, maps, annotations, authors = get_db_data(cursor=mycursor)
    # print("Number of levels in db: ", len(ann_ids))
    # 
    # ann_ids, maps, annotations = sort_by_annid(ann_ids, maps, annotations)
    # sent_embeddings, word_embeddings = get_sent_word_embeddings(model, tokenizer, annotations, max_len=25)
    # data = {
    #         'ann_ids' : ann_ids,
    #         'images': maps,
    #         'labels': annotations,
    #         'embeddings': sent_embeddings,
    #         'word_embeddings': word_embeddings,
    #     }
    # np.save('datasets/Map Data/maps_noaug.npy', data, allow_pickle=True)

In [None]:
# do gpt alt label augmentation
    ann_ids, maps, annotations, embeddings, authors = gpt_augmentation(ann_ids, maps, annotations, sent_embeddings, authors, model)
    print("Number of levels before filtering: ", len(ann_ids))

    ann_ids_exp = np.array(ann_ids)
    maps_exp = np.array(maps)
    annotations_exp = np.array(annotations)
    embeddings_exp = np.array(embeddings)


    export_data(ann_ids_exp, maps_exp, annotations_exp, embeddings_exp, 'datasets/maps_gpt4_aug.npy')