In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np
import pandas as pd
from torchvision import transforms

torch.manual_seed(1)

In [None]:
class EmojiDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.tweets_frame = pd.read_csv(csv_file)
        self.transform = transform

    def __len__(self):
        return len(self.landmarks_frame)

    def __getitem__(self, index):
        if torch.is_tensor(index):
            index = index.tolist()

        sentence = str(self.tweets_frame.iloc[index, 0])
        words = np.array(sentence.split(" "))
        emoji = self.tweets_frame.iloc[index, 1]

        sample = {'words': words, 'emoji':emoji}
        if self.transform:
            sample = self.transform(sample)

        return sample

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:

from collections import defaultdict

emojis_we_want = {
    ':red_heart:': 0, 
    ':face_with_tears_of_joy:': 0, 
    ':loudly_crying_face:': 0, 
    ':smiling_face_with_heart-eyes:': 0, 
    ':fire:': 0,
    ':folded_hands:': 0,
    ':weary_face:': 0,
    ':person_shrugging:': 0,
    ':two_hearts:': 0,
    ':sparkles:': 0
  }
training_data = EmojiDataset(csv_file='/content/drive/MyDrive/Colab Notebooks/py_dev_clean.csv')
train_sample = []
done = 0
i = 0

num_in_emoji = 30000

for sample in training_data:
    if sample['emoji'] in emojis_we_want.keys() and emojis_we_want[sample['emoji']] < num_in_emoji:
        train_sample.append(sample)
        emojis_we_want[sample['emoji']] += 1

    if sum(list(emojis_we_want.values())) >= len(emojis_we_want.keys())*num_in_emoji:
      break



In [None]:
import random

random.shuffle(train_sample)

cut = int(0.2*len(train_sample))
test_sample = train_sample[:cut]
train_sample = train_sample[cut:]

In [None]:
print(len(train_sample))
print(len(test_sample))

In [None]:
word_to_ix = {}
# For each words-list (sentence) and tags-list in each tuple of training_data
for sample in train_sample:
    for word in sample['words']:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)  # Assign each word with a unique index

for sample in test_sample:
    for word in sample['words']:
        if word not in word_to_ix:
            word_to_ix[word] = len(word_to_ix)  # Assign each word with a unique index
print(len(word_to_ix))

In [None]:
tag_to_ix = {}
# For each words-list (sentence) and tags-list in each tuple of training_data
for sample in train_sample:
    if sample['emoji'] not in tag_to_ix:
        tag_to_ix[sample['emoji']] = len(tag_to_ix)  # Assign each word with a unique index
print(tag_to_ix)

In [None]:
def prepare_sequence(seq, to_ix):
    idxs = [to_ix[w] for w in seq]
    return torch.tensor(idxs, dtype=torch.long)

In [None]:
class LSTMTagger(nn.Module):

    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size):
        super(LSTMTagger, self).__init__()
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim

        self.word_embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.emojis = prepare_sequence(emojis, tag_to_ix)

        # The LSTM takes word embeddings as inputs, and outputs hidden states
        # with dimensionality hidden_dim.
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)

        # The linear layer that maps from hidden state space to tag space
        self.hidden2tag = nn.Linear(hidden_dim*2, tagset_size)

    def forward(self, sentence):
        embeds = self.word_embeddings(sentence)
        lstm_out, _ = self.lstm(embeds.view(len(sentence), 1, -1))
        lstm_out = torch.relu(lstm_out)

        tag_space = self.hidden2tag(lstm_out.view(len(sentence), -1))
        tag_scores = F.log_softmax(tag_space, dim=1)
        return tag_scores

In [None]:
EMBEDDING_DIM = 100
HIDDEN_DIM = 64
emojis = tag_to_ix.keys()
emojis_tensor = prepare_sequence(emojis, tag_to_ix)
emoji_embeddings = nn.Embedding(len(emojis), EMBEDDING_DIM)

emoji_embeds = emoji_embeddings(emojis_tensor)




In [None]:
device = 'cuda' if torch.cuda.is_available()==True else 'cpu'
device = torch.device(device)
print(device)

In [None]:
device = 'cuda' if torch.cuda.is_available()==True else 'cpu'
device = torch.device(device)
print(device)

model = LSTMTagger(EMBEDDING_DIM, HIDDEN_DIM, len(word_to_ix), len(tag_to_ix))
model = model.to(device)

loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)
model.train()


for epoch in range(3):
    epoch_loss = 0
    samples_seen = 0

    partial_loss = 0
    for sample in train_sample:
        sentence = sample['words']
        tag = sample['emoji']

        model.zero_grad()

        # Step 2. Get our inputs ready for the network, that is, turn them into
        # Tensors of word indices.
        sentence_in = prepare_sequence(sentence, word_to_ix)
        targets = prepare_sequence([tag]*len(sentence), tag_to_ix)

        # Step 3. Run our forward pass.
        sentence_in = sentence_in.to(device)
        tag_scores = model(sentence_in)

        # Step 4. Compute the loss, gradients, and update the parameters by
        #  calling optimizer.step()

        targets = targets.to(device)
        loss = loss_function(tag_scores, targets)
        epoch_loss += loss.item()
        partial_loss += loss.item()
        loss.backward()
        optimizer.step()
        samples_seen += 1
        if samples_seen % 1000 == 0:
            print(epoch, samples_seen, epoch_loss/samples_seen, partial_loss/1000)
            partial_loss = 0

        

# See what the scores are after training

In [None]:
import scipy.stats as stats
import random
# {':red_heart:': 0, ':face_with_tears_of_joy:': 1, ':smiling_face_with_heart-eyes:': 2, ':loudly_crying_face:': 3, ':fire:': 4}
ix_to_emoji = {v:k for k,v in tag_to_ix.items()}
x = 0

i = 0

random.shuffle(train_sample)

true_pos = 0
total = 0
model.eval()

from collections import defaultdict

prediction_nums = {}
for emoji in tag_to_ix.keys():
  prediction_nums[emoji] = defaultdict(int)

print(prediction_nums)
with torch.no_grad():
  for sample in test_sample:
    sentence = sample['words']
    # print(sentence, sample['emoji'])
    inputs = prepare_sequence(sentence, word_to_ix).to(device)
    tag_scores = model(inputs)
    # print(tag_scores)
    # print(np.argsort(-tag_scores.cpu().numpy()))
    
    scores = torch.argmax(tag_scores, dim=1)
    # print(scores)
    predicted = ix_to_emoji[stats.mode(scores.cpu().numpy())[0][0]]

    if predicted == sample['emoji']:
      prediction_nums[sample['emoji']]['true_pos'] += 1

      for temp_emoji in prediction_nums.keys():
        if temp_emoji != predicted: prediction_nums[temp_emoji]['true_neg'] += 1
      true_pos += 1
    
    else:
      prediction_nums[predicted]['false_pos'] += 1
      prediction_nums[sample['emoji']]['false_neg'] += 1

    total += 1
    # print(predicted, sample['emoji'])
    if i%10000 == 0:
      print(true_pos/total)
    i+=1

print(prediction_nums)

In [None]:
for emoji in prediction_nums.keys():
  prediction_nums[emoji]['precision'] = (prediction_nums[emoji]['true_pos'])/(prediction_nums[emoji]['true_pos']+prediction_nums[emoji]['false_pos'])
  prediction_nums[emoji]['recall'] = (prediction_nums[emoji]['true_pos'])/(prediction_nums[emoji]['true_pos']+prediction_nums[emoji]['false_neg'])
  f1 = 2*((prediction_nums[emoji]['precision']*prediction_nums[emoji]['recall'])/(prediction_nums[emoji]['precision']+prediction_nums[emoji]['recall']))
  print(emoji, "prec:", prediction_nums[emoji]['precision'], "recall:", prediction_nums[emoji]['recall'])
  print("f1:", f1)