In [None]:
import pandas as pd
from transformers import DistilBertTokenizer, DistilBertModel
from policy import find_actions, levenshtein_distance
import numpy as np

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
model = DistilBertModel.from_pretrained("distilbert-base-uncased")



In [None]:
# reassign [unused0] at index 1 to [EMT] with add_special_tokens
print("initial tokenizer vocab size: ", len(tokenizer))
# relpace [unused0] with [EMT] in the vocab
tokenizer.add_special_tokens({'additional_special_tokens': ['[EMT]']})
model.resize_token_embeddings(len(tokenizer))



In [None]:
test_str = ["[EMT]"]
inputs = tokenizer(test_str, return_tensors="pt")
inputs

In [None]:
outputs = model(**inputs)


In [None]:
df = pd.read_csv("Tweets.csv")
texts = df["text"].tolist()
texts[:10]

In [None]:
EMT_ID = tokenizer.convert_tokens_to_ids("[EMT]")

def add_empty_ids(input_ids, empty_id):
    # If there are consecutive non-empty tokens, add an empty token between them
    # If there are consecutive empty tokens, remove all but one
    # Start and end with an empty token

    output_ids = [empty_id]

    for token_id in input_ids:
        if token_id != empty_id:
            if output_ids[-1] != empty_id:
                output_ids.append(empty_id)
            output_ids.append(token_id)
        else:
            if output_ids[-1] == empty_id:
                continue
            else:
                output_ids.append(token_id)

    if output_ids[-1] != empty_id:
        output_ids.append(empty_id)
    
    return output_ids


def get_labels(x_ids, y_ids, vocab_size):
    # takes empty tokenized (every other token is empty token) input and output sequences

    actions = find_actions(x_ids[1:-1:2], y_ids[1:-1:2])

    to_pos = lambda i: 2 * i + 1

    labels = np.zeros((len(x_ids), vocab_size))
    for action in actions:
        a, i, t = action
        if a == "R":
            labels[to_pos(i), t] = 1
        elif a == "I":
            labels[to_pos(i)-1, t] = 1
        elif a == "D":
            labels[to_pos(i), EMT_ID] = 1
    
    # add ones at the existing tokens where there is no action
    non_action_mask = np.where(labels.sum(axis=1) == 0)[0]
    x_ids = np.array(x_ids)
    labels[non_action_mask, x_ids[non_action_mask]] = 1
    


    return labels


def empty_tokenized_to_str(sequence):
    return tokenizer.decode(sequence[1:-1:2])
        
    


In [None]:
initial_sequence = np.random.choice(texts)
target_sequence = np.random.choice(texts)

print("INITIAL", initial_sequence)
print("TARGET", target_sequence)

initial_sequence = tokenizer(initial_sequence)["input_ids"][1:-1]
target_sequence = tokenizer(target_sequence)["input_ids"][1:-1]

distance = levenshtein_distance(initial_sequence, target_sequence)
print("DISTANCE", distance)
# print(tokenizer.decode(initial_sequence))
initial_sequence = add_empty_ids(initial_sequence, EMT_ID)
target_sequence = add_empty_ids(target_sequence, EMT_ID)

# print initial_sequence as string
# print(tokenizer.decode(initial_sequence))
# print(empty_tokenized_to_str(initial_sequence))

for i in range(100):
    labels = get_labels(initial_sequence, target_sequence, len(tokenizer))
    labels += np.random.uniform(0, 0.1, labels.shape)
    new_ids = np.argmax(labels, axis=1)
    if not (new_ids == initial_sequence).all():
        replacement_id = np.random.choice(np.where(new_ids != initial_sequence)[0])
        initial_sequence[replacement_id] = new_ids[replacement_id]
        initial_sequence = add_empty_ids(initial_sequence, EMT_ID)
    print(i+1, empty_tokenized_to_str(initial_sequence))

    if (initial_sequence == target_sequence):
        print("DONE")
        break
    

