In [None]:
%load_ext lab_black

In [None]:
import pandas as pd
import numpy as np
import torch

from twemoji.twemoji_dataset import TwemojiData, TwemojiBalancedData, TwemojiDataChunks
from embert import Sembert, TopKAccuracy, LiteralModel, Baseline
from tqdm import tqdm

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

TRAIN_IDX = list(range(1711))
TEST_IDX = list(range(1810))

### load model

In [None]:
def get_model(balanced=False):
    model = Sembert(dropout=0.2)
    model = model.to(device)
    if balanced:
        pretrained_path = "trained_models/balanced_sembert_dropout/balanced_sembert_dropout_chunk106.ckpt"
    else:
        pretrained_path = "trained_models/sembert_dropout/sembert_dropout_chunk77.ckpt"
    model.load_state_dict(torch.load(pretrained_path, map_location=device))
    model.eval()
    return model

In [None]:
%%capture
# model1 = get_model()
# model2 = get_model(balanced=True)

model1 = LiteralModel()
model2 = Baseline()

### load mapping dicts etc.

In [None]:
df_des = pd.read_csv("emoji_embedding/data/processed/emoji_descriptions.csv")
emoji_id_char = {k: v for k, v in zip(df_des.emoji_id, df_des.emoji_char)}

In [None]:
TOP_EMOJIS = (
    pd.read_csv("twemoji/data/twemoji_prevalence.csv")
    .sort_values(by="prevalence", ascending=False)
    .emoji_ids.tolist()
)

### create artificial custom twitter sentences

In [None]:
def get_outputs(model, X, restriction_type=None):
    """Returns the adjusted output of our model. Depending on the restriction type
    the prediction of certain emojis is set to 0.

    Params:
        - model {torch.nn.Module}: model that outputs probabilities for each emoji
            given the list of input sentences X and emojis to be considered
        - X {list}: list of string sentences that are to be used for prediction
        - restriction_type {int}: determines which emoji predictions to set to 0
                - -1 set all emojis that have been known during training to 0
                - any other integer i: set the top i emojis (according to prevalence in)
                    training data to zero
    """
    outputs = model(X, TEST_IDX)
    if restriction_type is not None:
        if restriction_type > 0:
            excluded_emojis = TOP_EMOJIS[:restriction_type]
        else:
            excluded_emojis = TRAIN_IDX
        mask_idx = [int(i not in excluded_emojis) for i in TEST_IDX]
        mask_idx = torch.tensor([mask_idx for _ in range(len(X))]).to(device)
        outputs = outputs * mask_idx
    return outputs


def get_emojis(model, sentences, top_k, emoji_id_char, restricted_type=None):
    """
    Prints sentence and the model predicted normal top k prediction
    and top k restricted predictions as specified in restricted type.
    """
    predictions = get_outputs(model, sentences)
    _, topk_emoji_ids = torch.topk(predictions, top_k, dim=-1)
    topk_predictions = [
        [emoji_id_char[em.item()] for em in row] for row in topk_emoji_ids
    ]

    predictions_restricted = get_outputs(model, sentences, restricted_type)
    _, topk_emoji_ids_restricted = torch.topk(predictions_restricted, top_k, dim=-1)
    topk_predictions_restricted = [
        [emoji_id_char[em.item()] for em in row] for row in topk_emoji_ids_restricted
    ]

    for i, s in enumerate(sentences):
        print(
            s,
            "### normal prediction",
            topk_predictions[i],
            "### restricted prediction",
            topk_predictions_restricted[i],
            "\n",
        )


def get_proba_for_emoji(model, sentences, emoji_idx, emoji_id_char):
    """
    For given sentences print the probabilities assigned by the model to particular
    emoji: emoij_idx.
    """
    predictions = get_outputs(model, sentences)
    probas = torch.gather(
        predictions, 1, torch.tensor([[emoji_idx] for _ in range(len(sentences))])
    )
    for i, s in enumerate(sentences):
        print(
            s,
            f"probability for {emoji_id_char[emoji_idx]}",
            probas[i].item(),
            "\n",
        )

In [None]:
sentences = [
    "i like dinosaurs!",
    "i like sauropod!",
    "crocodiles are so awesome.",
    "came back home and saw an elephant",
    "i hate it when people don't text back",
    "war is bad we need peace",
    "second place medal looks good",
    "this football game is lit",
    "I am so angryyy",
    "swinging an axe",
    "I am a baby",
    "crocodile, crocodile, crocodile",
    "crocodile",
    "turtle",
    "I like turtles",
    "I am the dragon master",
    "blue whales are my favorite animals",
    "penis",
    "shake that ass",
    "shake that booty",
    "idiot",
    "do you want to come over tonight?",
    "it's getting a littly cold",
    "he is hitting one after another basket",
]

print("\n", "*" * 10, "sembert_dropout", "*" * 10, "\n")
get_emojis(model1, sentences, 5, emoji_id_char, restricted_type=40)
print("\n", "*" * 10, "balanced_sembert_dropout", "*" * 10, "\n")
get_emojis(model2, sentences, 5, emoji_id_char, restricted_type=40)

In [None]:
# get_proba_for_emoji(model_balanced, sentences, 1784, emoji_id_char)

## contrast balanced vs. non balanced model

In [None]:
ending = [" is what I love", " makes me angry", " is such a stupid word!"]
test_text = (df_des.emjpd_emoji_name_og + ending[0]).tolist()

In [None]:
def get_topk_hit(
    model, sentences, ids, top_k, emoji_id_char, restricted_type=None, batch_size=32
):

    X_ls = [sentences[i : i + batch_size] for i in range(0, len(sentences), batch_size)]
    y_ls = [list(ids[i : i + batch_size]) for i in range(0, len(ids), batch_size)]

    result = []
    result_restricted = []
    for i, (X, y) in enumerate(zip(X_ls, y_ls)):
        predictions = get_outputs(model, X)
        _, topk_emoji_ids = torch.topk(predictions, top_k, dim=-1)

        predictions_restricted = get_outputs(model, X, restricted_type)
        _, topk_emoji_ids_restricted = torch.topk(predictions_restricted, top_k, dim=-1)

        min_idx = int(i * batch_size)
        result += [y[i] in topk_emoji_ids[i] for i in range(len(X))]
        result_restricted += [
            y[i] in topk_emoji_ids_restricted[i] for i in range(len(X))
        ]

    return result, result_restricted

In [None]:
%%time
result, result_restricted = get_topk_hit(
    model1, test_text, range(len(test_text)), 5, emoji_id_char, restricted_type=40
)

In [None]:
%%time
result_balanced, result_restricted_balanced = get_topk_hit(
    model2, test_text, range(len(test_text)), 5, emoji_id_char, restricted_type=40
)

In [None]:
def get_accuracy(ls):
    return sum(ls) / len(ls)

In [None]:
print("normal", get_accuracy(result))
print("restricted", get_accuracy(result_restricted))
print("balanced_normal", get_accuracy(result_balanced))
print("balanced_restricted", get_accuracy(result_restricted_balanced))