In [None]:
%pip install Unidecode nltk, emoji, pandas

In [52]:
import pandas as pd

class DataPreprocessor:

    def __init__(self):
        self.data = []

    def load_data_from_csv(self, filepath):
        df = pd.read_csv(filepath)
        self.data.append(df)

    def combine_data(self):
        self.data = pd.concat(self.data)

In [273]:
import nltk
from nltk.stem import PorterStemmer
from collections import defaultdict

import emoji
import string
from itertools import tee
import csv
import json
import re

class Indexer:

    def __init__(self):
        self.inverted_index = defaultdict(dict)
        self.emoji_dict = defaultdict()
        self.stopwords = nltk.corpus.stopwords.words('english')
        self.stopwords += list(string.punctuation)
        self.ps = PorterStemmer()
        self.translator = str.maketrans('', '', string.punctuation)

    def _clean_text(self, text: string):
        text = re.sub(r"http\S+|www\S+|https\S+", '', text, flags=re.MULTILINE)
        text = text.translate(self.translator)
        words = nltk.word_tokenize(text)
        words = [self.ps.stem(a) for a in words] #if a not in self.stopwords
        return words

    def index_data(self, text):
        words = self._clean_text(text)
        try:
            emojis = []
            clean_words = []
            for word in words:
                if(emoji.purely_emoji(word)):
                    emojis.append(word)
                    clean_words.append(word)
                elif emoji.emoji_count(word) > 0:
                    extracted_emojis = "".join([em[0] for em in emoji.analyze(word, join_emoji=True)])
                    word = emoji.replace_emoji(word, replace="")
                    emojis.append(extracted_emojis)
                    clean_words.append(word)
                    clean_words.append(extracted_emojis)
                else:
                    clean_words.append(word)

            current_words = []
            for i, word in enumerate(clean_words):
                if word not in self.inverted_index:
                    self.inverted_index[word] = {'count': 0, 'emojis': {}}
                if not emoji.purely_emoji(word):
                    current_words.append(word)
                else:    
                    if word not in self.emoji_dict:
                        self.emoji_dict[word] = 1
                    else:
                        self.emoji_dict[word] += 1
                    self.inverted_index[word]['count'] += 1
                    for e in emojis:
                        emoji_offset = abs(clean_words.index(e) - i) + 1
                        if(emoji_offset == 1): # itself
                            continue
                        if e not in self.inverted_index[word]['emojis']:
                            self.inverted_index[word]['emojis'][e] = []
                        self.inverted_index[word]['emojis'][e].append(emoji_offset)
                    
                    for w in current_words:
                        self.inverted_index[w]['count'] += 1
                        word_offset = abs(clean_words.index(w) - i) + 1
                        if w not in self.inverted_index[w]['emojis']:
                            self.inverted_index[w]['emojis'][word] = []
                        self.inverted_index[w]['emojis'][word].append(word_offset)
                    current_words = []
        except StopIteration:
            pass
    
    def save_metadata(self, filepath):
        with open(filepath, 'w') as f:
            json.dump(self.emoji_dict, f)

    def save_index(self, filepath):
        with open(filepath, 'w') as f:
            json.dump(self.inverted_index, f)

    def save_index_csv(self, filepath):
        with open(filepath, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for word in self.inverted_index:
                writer.writerow([word, self.inverted_index[word]['count'], self.inverted_index[word]['emojis']])

    def read_index(self, filepath):
        self.inverted_index = defaultdict(dict)
        with open(filepath, 'r') as f:
            self.inverted_index = json.load(f)
    
    def read_meta(self, filepath):
        self.emoji_dict = defaultdict()
        with open(filepath, 'r') as f:
            self.emoji_dict = json.load(f)

# i = Indexer()
# i.index_data("good good luck 😊😊 you dawg🐕🐕 qhoo👨‍👩🏿‍👧🏻‍👦🏾 aah 😮‍💨")
# i.save_index_csv("output/index.csv")
# i.save_index("output/index.json")
# i.save_metadata("output/meta.json")


In [351]:
import math

class QueryEngine:

    def __init__(self, index, meta):
        self.index = index
        self.meta = meta
        self.query_result = defaultdict(dict)

    # Vanilla (ish) tf-idf
    def process_query_tf_idf(self, search_query, cleaner):
        self.query_result = defaultdict(dict)
        query = cleaner(search_query)
        n = len(self.index)
        for i, query_term in enumerate(query):
            postings = self.index[query_term] if query_term in self.index else None
            if postings is None:
                continue
            query_weight = 1.0
            query_tf = len([q for q in query if q == query_term])
            query_weight *= query_tf

            #n = len(self.meta)
            df_t = len(postings)
            idf_t = math.log(n / df_t)
            for emo, offset_list in postings['emojis'].items():
                if emo not in self.query_result:
                    self.query_result[emo] = {'query': search_query.split(' ')[i], 'raw': emo, 'emoji': emo, 'score': 0}
                self.query_result[emo]['score'] += (query_tf * idf_t * postings['count']) 
            # normalize
        for emo, info in self.query_result.items():
            self.query_result[emo]['score'] /=  self.meta[emo]


        return self.query_result
    
        # Vanilla (ish) tf-idf
    def process_query_tf_idf2(self, search_query, cleaner):
        self.query_result = defaultdict(dict)
        query = cleaner(search_query)
        n = len(self.index)
        for i, query_term in enumerate(query):
            postings = self.index[query_term] if query_term in self.index else None
            if postings is None:
                continue
            query_weight = 1.0
            query_tf = len([q for q in query if q == query_term])
            query_weight *= query_tf

            #n = len(self.meta)
            df_t = len(postings)
            idf_t = math.log(n / df_t)
            for emo, offset_list in postings['emojis'].items():
                avg_offset = sum(offset_list) / len(offset_list) if offset_list else 1
                if emo not in self.query_result:
                    self.query_result[emo] = {'query': search_query.split(' ')[i], 'raw': emo, 'emoji': emo, 'score': 0}
                self.query_result[emo]['score'] += (query_tf * idf_t * postings['count']) / avg_offset
            # normalize
        for emo, info in self.query_result.items():
            self.query_result[emo]['score'] /=  self.meta[emo]


        return self.query_result


In [358]:
def print_top_emojis(query_result, search, n_per_word=3, n_overall=5):
    query_words = search.split(' ')
        
        # Print top emojis for each word
    for word in query_words:
        word_emojis = [(emoji, info['score']) for emoji, info in query_result.items() if info['query'] == word]
        word_emojis.sort(key=lambda x: x[1], reverse=True)
        top_emojis = word_emojis[:n_per_word]
        print(f"For the word '{word}', the top {n_per_word} emojis are: {top_emojis}")

        # Print top emojis overall
    all_emojis = [(emoji, info['score']) for emoji, info in query_result.items()]
    all_emojis.sort(key=lambda x: x[1], reverse=True)
    top_emojis = all_emojis[:n_overall]
    print(f"The top {n_overall} emojis overall are: {top_emojis}")

In [359]:

import glob
preprocessor = DataPreprocessor()

csv_files = glob.glob('data/clean/*.csv')

for filename in csv_files:
    preprocessor.load_data_from_csv(filename)
preprocessor.combine_data()

# Index data
indexer = Indexer()

preprocessor.data['text'].apply(lambda x: indexer.index_data(x))

indexer.save_index('output/index.json')
indexer.save_metadata('output/meta.json')
indexer.save_index_csv('output/index.csv')

In [360]:
print("reading data from file")
indexer = Indexer()
indexer.read_index("output/index.json")
indexer.read_meta("output/meta.json")


reading data from file


In [397]:
# Query data
engine = QueryEngine(indexer.inverted_index, indexer.emoji_dict)
query = "gamer"
engine.process_query_tf_idf(query, indexer._clean_text)
print_top_emojis(engine.query_result, query)



For the word 'gamer', the top 3 emojis are: [('🖥🖱😎👊', 734.4372345011052), ('😎🕹', 734.4372345011052), ('🧪🎮', 734.4372345011052)]
The top 5 emojis overall are: [('🖥🖱😎👊', 734.4372345011052), ('😎🕹', 734.4372345011052), ('🧪🎮', 734.4372345011052), ('🆘🆘', 367.2186172505526), ('🎮🎮', 183.6093086252763)]


In [340]:
with open("data.csv", 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            for word in self.inverted_index:
                writer.writerow([word, self.inverted_index[word]['count'], self.inverted_index[word]['emojis']])
