In [None]:
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.feature_extraction.text import TfidfTransformer,CountVectorizer
from sklearn.metrics import accuracy_score, confusion_matrix,classification_report
import pandas as pd
import numpy as np


In [None]:
# import data
train_df = pd.read_csv('train_15_emoji.csv')
test_df = pd.read_csv('test_15_emoji.csv')
X_train = train_df['cleaned_content']
y_train = train_df['emoji_id']
X_test = test_df['cleaned_content']
y_test = test_df['emoji_id']
my_tags = list(train_df['emoji_id'].unique())
my_tags = [str(i) for i in my_tags]

# Naive Bayes with tf-idf

In [None]:
# build model
nb = Pipeline([('vect', CountVectorizer()),
               ('tfidf', TfidfTransformer()),
               ('clf', MultinomialNB()),
              ])

nb.fit(X_train, y_train)

Pipeline(steps=[('vect', CountVectorizer()), ('tfidf', TfidfTransformer()),
                ('clf', MultinomialNB())])

In [None]:
# model evaluation
y_pred = nb.predict(X_test)
print('accuracy %s' % accuracy_score(y_pred, y_test))
print(classification_report(y_test, y_pred,target_names=my_tags))

accuracy 0.3848743501511969
              precision    recall  f1-score   support

           7       0.29      0.26      0.28      4462
           6       0.28      0.27      0.28      4386
           8       0.43      0.46      0.44      4466
          10       0.31      0.24      0.27      4501
           5       0.30      0.30      0.30      4513
           0       0.30      0.28      0.29      4514
           1       0.38      0.38      0.38      4504
           4       0.35      0.42      0.39      4458
          12       0.45      0.42      0.44      4370
          13       0.51      0.49      0.50      4426
           9       0.41      0.45      0.43      4468
           2       0.54      0.60      0.56      4508
           3       0.52      0.38      0.44      4532
          11       0.31      0.40      0.35      4452
          14       0.41      0.43      0.42      4571

    accuracy                           0.38     67131
   macro avg       0.39      0.38      0.38     6713

In [None]:
# Top 3 accuracy - overall score
def top_k_prediction(x, k):
    results = []
    prob = nb.predict_proba(x)
    for item in prob:
        # Create a copy of the original array to avoid modifying it
        arr_copy = item.copy()

        # Get the indices that would sort the array in ascending order
        sorted_indices = np.argsort(arr_copy)

        # Take the last 5 indices to get the indices of the top 5 elements
        top_indices = sorted_indices[-k:]
        
        results.append(top_indices)
        
    return results

result = test_df.copy()
# get top 3 prediction 
result["predict_emoji_3"] = top_k_prediction(X_test, k = 3)
result["predict_accurate_3"] = result.apply(lambda x: x.emoji_id in x.predict_emoji_3, axis = 1)
# Top 3 accuracy - overall score
print("Top 3 Prediction Accuracy:", len(result[result["predict_accurate_3"] == 1]) / len(result))

Top 3 Prediction Accuracy: 0.627951319062728


# Naive Bayes with BoW

In [None]:
from collections import defaultdict
from nltk.corpus import stopwords
import random
import numpy as np
import math

class DataReader:
    def __init__(self, tweet_file, labels_file):
        self.tweet_file = tweet_file
        self.labels_file = labels_file
        self.label_set = set()
        self.seed = random.seed(4222)


    def read_tweets(self):
        # with open(self.tweet_file, 'r') as doc:
        #     tweets = doc.read().splitlines()
        tweets = self.tweet_file
        random.seed(self.seed)
        random.shuffle(tweets)
        return tweets

    def read_labels(self):
        # with open(self.labels_file, 'r') as doc:
        #     labels_for_tweets = doc.read().splitlines()
        labels_for_tweets = self.labels_file
        random.seed(self.seed)
        random.shuffle(labels_for_tweets)
        self.label_set = set(labels_for_tweets)
        return labels_for_tweets

    def get_label_set(self):
        return self.label_set

    @staticmethod
    def tokenize(tweet):
        stops = set(stopwords.words("english"))
        bag_of_words = defaultdict(float)
        words = DataReader.extract_words_from_tweet(tweet)
        for word in words:
            if word[0] != "#" and word[0] != "@" and word not in stops:
                bag_of_words[word] += 1.0
        return bag_of_words

    @staticmethod
    def extract_words_from_tweet(tweet):
        word_list = []
        word_string = ""
        for char in tweet:
            if char.isalpha():
                word_string = word_string + char
            elif not len(word_string) < 2:
                word_list.append(word_string.lower())
                word_string = ""
        return word_list

    def get_features(self):
        tweets = self.read_tweets()
        labels = self.read_labels()
        np_labels = np.array(labels)
        vocab = set()
        for tweet in tweets:
            tokens = DataReader.tokenize(tweet).keys()
            for key in tokens:
                vocab.add(key)
        index_of_word = 0
        mapping = {}
        for word in vocab:
            mapping[word] = index_of_word
            index_of_word += 1

        feature_vector = np.zeros((len(tweets), len(vocab)), dtype=np.uint8)
        counter = 0
        for tweet in tweets:
            tokens = DataReader.tokenize(tweet).keys()
            for key in tokens:

                feature_vector[counter][mapping[key]] += 1
            counter += 1
        return feature_vector, np_labels

    @staticmethod
    def get_tokens(bow):
        """
        Returns the number of tokens in the bag of words.
        bow - bag of words representation
        """
        sum_toks = 0.0
        for key in bow:
            sum_toks += bow[key]
        return sum_toks

class NaiveBayes:
    def __init__(self, train_tweets, train_labels, test_tweets, test_labels, alpha=1):
        
        self.alpha = alpha
        self.vocabulary = set()
        self.total_tweets_per_class = defaultdict(float)
        self.word_counts_per_class = defaultdict(float)

        self.words_per_class = {}
        self.train_tweets = list(train_tweets)
        self.train_labels = list(train_labels)
        self.test_tweets = list(test_tweets)
        self.test_labels = list(test_labels)
        self.label_set = set(train_labels)
        if len(set(train_labels)) != len(set(test_labels)):
            print("train:", len(set(train_labels)), "test:", len(set(test_labels)))
            raise ValueError("Training and test labels are not the same")

        for label in self.label_set:
            self.words_per_class[label] = defaultdict(float)

        self.prior_count_tweets = 0.0

    def update_model(self):
        self.prior_count_tweets = len(self.train_tweets)
        for tweet_number in range(0, len(self.train_tweets)):
            label = self.train_labels[tweet_number]
            self.total_tweets_per_class[label] += 1.0
            bow = DataReader.tokenize(self.train_tweets[tweet_number])
            sum = DataReader.get_tokens(bow)
            self.word_counts_per_class[label] += sum
            for key in bow:
                self.words_per_class[label][key] += bow[key]
                self.vocabulary.add(key)
        

    def p_word_given_label_and_pseudocount(self, word, label):
        """
        Returns the probability of word given label wrt psuedo counts.
        alpha - pseudocount parameter
        """
        den = self.alpha * len(self.vocabulary)
        my_word_prob = self.words_per_class[label][word] + self.alpha
        total_words_label = self.word_counts_per_class[label] + den
        return my_word_prob / total_words_label

    def log_likelihood(self, bow, label):
        """
        Computes the log likelihood of a set of words give a label and pseudocount.
        bow - a bag of words (i.e., a tokenized document)
        label - either the positive or negative label
        alpha - float; pseudocount parameter
    
        """
        log_lk = 0
        for key in bow.keys():
            log_lk += math.log(self.p_word_given_label_and_pseudocount(key, label))
        return log_lk

    def log_prior(self, label):
        """
        Returns the log prior of a document having the class 'label'.
        """
        c = self.total_tweets_per_class[label]
        tot = self.prior_count_tweets
        # do add one smoothing
        c += 1 
        tot += len(self.vocabulary)
            
        return math.log(c / tot)

    def unnormalized_log_posterior(self, bow, label):
        """
        Computes the unnormalized log posterior (of doc being of class 'label').
        bow - a bag of words (i.e., a tokenized document)
        """
        return self.log_prior(label) + self.log_likelihood(bow, label)

    def classify(self, bow):
        """
        Classifies a tweet based on it's bag of word representation
        bow - a bag of words (i.e., a tokenized document)
        alpha - pseudocount
        """
        max_unnormalized = float('-inf')
        argmax_unnormalized = '-1'
        for label in self.label_set:
            var_ret = self.unnormalized_log_posterior(bow, label)
            if var_ret > max_unnormalized:
                max_unnormalized = var_ret
                argmax_unnormalized = label
        return argmax_unnormalized
    
    def get_top_k_lables(self, bow, k):
        """
        Classifies a tweet based on it's bag of word representation
        bow - a bag of words (i.e., a tokenized document)
        alpha - pseudocount
        """
        var_ret_ls = []
        var_ls = []
        
        for label in self.label_set:
            var_ls.append(label)
            var_ret_ls.append(self.unnormalized_log_posterior(bow, label))
        var_ret_ls, var_ls = zip(*sorted(zip(var_ret_ls, var_ls), reverse=True))
        # return the top k labels
        return var_ls[:k]
    
    def evaluate_classifier_accuracy_top_k(self, k):
    
        def compute_accuracy(tweets, labels):

            # inialize the dictionary
            pred_accuracy_dict = {}
            for label in self.label_set:
                pred_accuracy_dict[label] = {"correct": 0.0, "total": 0.0, "tested_tweets":[]} # tested_tweets is for tweets with multiple emojis

            l = len(tweets)
            for l in range(l):
                label = labels[l]
                tweet = tweets[l]
                if tweet in pred_accuracy_dict[label]["tested_tweets"]:
                    continue
                else:
                    bow = DataReader.tokenize(tweet)
                    predicted_label = self.get_top_k_lables(bow, k)
                    if label in predicted_label:
                        pred_accuracy_dict[label]["correct"] += 1.0
                    pred_accuracy_dict[label]["total"] += 1.0
                    pred_accuracy_dict[label]["tested_tweets"].append(tweet)
        
            # calculate the accuracy for each label
            for label in self.label_set:
                if pred_accuracy_dict[label]["total"] == 0:
                    pred_accuracy_dict[label]["accuracy"] = 0.0
                else:
                    pred_accuracy_dict[label]["accuracy"] = round(pred_accuracy_dict[label]["correct"] / pred_accuracy_dict[label]["total"],2)*100
            
            res = {} #remove the total and correct
            for key in pred_accuracy_dict:
                res[key] = pred_accuracy_dict[key]["accuracy"]

            return res # in percentage
        
        # train accuracy
        train_acc = compute_accuracy(self.train_tweets, self.train_labels)
        # test accuracy
        test_acc = compute_accuracy(self.test_tweets, self.test_labels)

        # accuracy df 
        # initialize list elements
        emoji_id= list(train_acc.keys())
        
        # Create the pandas DataFrame with column name is provided explicitly
        acc_df = pd.DataFrame(emoji_id, columns=['emoji_id'])
        acc_df['train_accuracy'] = acc_df['emoji_id'].map(train_acc)
        acc_df['test_accuracy'] = acc_df['emoji_id'].map(test_acc)

        
        return acc_df


In [None]:

train_tweets = train_df["cleaned_content"]
train_labels = train_df["emoji_id"]
test_tweets = test_df["cleaned_content"]
test_labels = test_df["emoji_id"]

# generate bow for each tweet in the train and test set
train_df["bow"]  = train_df["cleaned_content"].apply(lambda x: DataReader.tokenize(x))
test_df["bow"]  = test_df["cleaned_content"].apply(lambda x: DataReader.tokenize(x))

nb_obj = NaiveBayes(train_tweets, train_labels, test_tweets, test_labels)
nb_obj.update_model()
train_df["pred"] = train_df["bow"].apply(lambda x: nb_obj.get_top_k_lables(x,1)[0])
test_df["pred"] = test_df["bow"].apply(lambda x: nb_obj.get_top_k_lables(x,1)[0])


In [None]:

# classification report
from sklearn.metrics import classification_report
# model accuracy, precision and F1:
y_true = test_df["emoji_id"]
y_pred = test_df["pred"]

print('accuracy %s' % accuracy_score(y_pred, y_true))
print(classification_report(y_true, y_pred,target_names=my_tags))

accuracy 0.33111379243568545
              precision    recall  f1-score   support

           7       0.26      0.20      0.23      4462
           6       0.16      0.30      0.20      4386
           8       0.38      0.40      0.39      4466
          10       0.30      0.19      0.23      4501
           5       0.26      0.23      0.24      4513
           0       0.27      0.27      0.27      4514
           1       0.35      0.33      0.34      4504
           4       0.33      0.37      0.35      4458
          12       0.44      0.37      0.40      4370
          13       0.47      0.41      0.44      4426
           9       0.35      0.40      0.37      4468
           2       0.49      0.47      0.48      4508
           3       0.43      0.33      0.37      4532
          11       0.30      0.33      0.31      4452
          14       0.38      0.38      0.38      4571

    accuracy                           0.33     67131
   macro avg       0.34      0.33      0.33     671

In [None]:
# top k 
acc_df = nb_obj.evaluate_classifier_accuracy_top_k(3)
acc_df = acc_df.sort_values(by=['test_accuracy'], ascending=False)
print( "top 3 Accuracy:")
print(acc_df)
print("Average Top 3 Accuracy is", round(acc_df["test_accuracy"].mean(),2))

top 3 Accuracy:
    emoji_id  train_accuracy  test_accuracy
10        10            73.0           61.0
11        11            68.0           60.0
9          9            70.0           58.0
13        13            69.0           57.0
14        14            70.0           57.0
2          2            68.0           56.0
1          1            69.0           54.0
5          5            69.0           54.0
7          7            64.0           51.0
0          0            67.0           50.0
4          4            66.0           50.0
8          8            65.0           49.0
3          3            64.0           48.0
6          6            63.0           48.0
12        12            49.0           39.0
Average Top 3 Accuracy is 52.8
