In [35]:
import numpy as np

In [36]:
titles = np.loadtxt('./title-scraper/titles.txt', encoding='utf-8', dtype='str', delimiter = '\n')
titles

array(['تشکیل شورایی ویژه دانشجویان ایرانی اوکراین/ وجود ۲۲۰ پروژه نیمه تمام بیمارستانی در کشور',
       'خانه\u200cتکانی اساسی و رونق دوباره شرکت\u200cهای خدماتی',
       'بازداشت وزرای فرهنگ و خارجه دولت مکلف لیبی در آستانه ادای سوگند',
       ..., 'تساوی ارومیه و ایلام در دیدار معوقه لیگ فوتبال زنان',
       'سهم استان مرکزی در طرح جهش تولید مسکن ۶۱ هزار واحد است',
       'نظر متفاوت گروهی از محققان درباره نزدیکترین سیاهچاله به زمین'],
      dtype='<U157')

In [37]:
from sklearn.model_selection import train_test_split
titles_train, titles_test = train_test_split(titles, random_state=42, test_size=0.2)

In [38]:
print("Training Set Size: ", titles_train.shape[0])
print("Test Set Size: ", titles_test.shape[0])

Training Set Size:  2187
Test Set Size:  547


In [39]:
TRAINING_SET_PATH = './dataset/titles_train.txt'
TEST_SET_PATH = './dataset/titles_test.txt'

In [40]:
np.savetxt(TRAINING_SET_PATH, titles_train, encoding='utf-8', delimiter='\n', fmt='%s')
np.savetxt(TEST_SET_PATH, titles_test, encoding='utf-8', delimiter='\n', fmt='%s')

In [41]:
import re

class DataProcessor:
    def __init__(self, PATH, is_test_set=0):
        self.read_data(PATH)
        self.clean_text()
        self.tokenizer()
        self.get_most_freq(is_test_set)
        self.handle_unknown()
        
    
    def read_data(self, PATH):
        self.data = np.loadtxt(PATH, encoding='utf-8', dtype='str', delimiter='\n')

    def clean_text(self):
        local_data = self.data
        self.cleaned_data = []
        for title in local_data:
            title2 = re.sub(r'[^۰-۹آ-ی؟.\s]','', title)
            title2 = re.sub(r'[۰-۹]+', 'N', title2)
            title2 = '\s ' + title2 + ' \e'
            self.cleaned_data.append(title2)
    
    def tokenizer(self):
       self.tokenized_data = list(map(lambda x: x.split(), self.cleaned_data))
    
    def get_most_freq(self, is_test_set):
        frequencies = {}
        n_tokens = 0
        for title_list in self.tokenized_data:
            for token in title_list:
                if token in frequencies:
                    frequencies[token] += 1
                else:
                    frequencies[token] = 1
                n_tokens += 1

        frequencies_pair_list = list(frequencies.items())
        frequencies_pair_list.sort(key=lambda x: x[1], reverse=True)
        
        if not is_test_set:
            with open('frequent.txt', 'w', encoding='utf-8') as f:
                most_frequencies_list = frequencies_pair_list[0:200]
                tokens_string = ''
                for token,value in most_frequencies_list:
                    tokens_string += token + ',' + str(value) + '\n'
                f.write(tokens_string)

        self.n_unique_tokens = len(frequencies_pair_list)
        self.n_tokens = n_tokens
        self.frequencies_dict = dict(frequencies_pair_list[0:10000])

        return (self.frequencies_dict, self.n_tokens, self.n_unique_tokens)

    def handle_unknown(self):
        for i in range(len(self.tokenized_data)):
            for j in range(len(self.tokenized_data[i])):
                if self.tokenized_data[i][j] not in self.frequencies_dict:
                    self.tokenized_data[i][j] = 'unk'
        self.prepared_data = self.tokenized_data
    
    def info(self):
        print("Number of Unique Tokens: ", self.n_unique_tokens)
        sum_len = sum([len(title) for title in self.prepared_data])
        print("Length of Sentences Average: {0:.2f}".format(sum_len/len(self.prepared_data)))


In [42]:
class NgramLanguageModel:
    def __init__(self, data):
        self.make_ngram_dicts(data)
    
    def make_ngram_dicts(self, data):
        # unigram_dict
        unigram_dict = {}
        for title_list in data:
            for token in title_list:
                if token in unigram_dict:
                    unigram_dict[token] += 1
                else:
                    unigram_dict[token] = 1
        self.unigram_dict = unigram_dict

        # bigram_dict
        bigram_dict = {}
        for title_list in data:
            for i in range(1, len(title_list)):
                if (title_list[i-1], title_list[i]) in bigram_dict:
                        bigram_dict[(title_list[i-1], title_list[i])] += 1
                else:
                    bigram_dict[(title_list[i-1], title_list[i])] = 1
        self.bigram_dict = bigram_dict

        # trigram_dict
        trigram_dict = {}
        for title_list in data:
            for i in range(2, len(title_list)):
                if (title_list[i-2], title_list[i-1], title_list[i]) in trigram_dict:
                        trigram_dict[(title_list[i-2], title_list[i-1], title_list[i])] += 1
                else:
                    trigram_dict[(title_list[i-2], title_list[i-1], title_list[i])] = 1
        self.trigram_dict = trigram_dict               

    def cal_2gram_prob(self, v1, v2, N_vocabs):
        if (v1, v2) not in self.bigram_dict:
            self.bigram_dict[(v1,v2)] = 0
        if v1 not in self.unigram_dict:
            self.unigram_dict[v1] = 0

        return (self.bigram_dict[(v1, v2)] + 1)/(self.unigram_dict[v1] + N_vocabs)

    def cal_3gram_prob(self, v1, v2, v3, N_vocabs):
        if (v1, v2, v3) not in self.trigram_dict:
            self.trigram_dict[(v1,v2,v3)] = 0
        if (v1,v2) not in self.bigram_dict:
            self.bigram_dict[(v1,v2)] = 0  
        
        return (self.trigram_dict[(v1, v2, v3)] + 1)/(self.bigram_dict[(v1, v2)] + N_vocabs)

    def calculate_smoothed_probs(self, n, string):
        N_vocabs = len(self.unigram_dict.keys())
        vocabs = list(self.unigram_dict.keys())
        probs_list = []
        if n == 2:
            for i in range(N_vocabs):
                if vocabs[i] != '\s':    
                    probs_list.append((vocabs[i], self.cal_2gram_prob(string[-1], vocabs[i], N_vocabs)))
        elif n == 3:
            for i in range(N_vocabs):
                if vocabs[i] != '\s':
                    probs_list.append((vocabs[i], self.cal_3gram_prob(string[-2], string[-1], vocabs[i], N_vocabs)))
        
        probs_list.sort(key=lambda x: x[1], reverse=True)
        return probs_list[:5]
        
    def generate_text(self, n, input_string):
        generated_token = None
        if input_string[-1] == '\e':
            return None
        while generated_token != '\e':
            most_prob_vocab = self.calculate_smoothed_probs(n, input_string)[0][0]
            input_string += [most_prob_vocab]
            generated_token = most_prob_vocab
            if len(input_string) > 20:
                input_string += ['\e']
                generated_token = '\e'

        return input_string    

    def average_log_likelihood(self, sentences, n, N_eval):
        sum = 0
        N_vocabs = len(self.unigram_dict.keys())
        for sentence in sentences:
            sum += self.cal_2gram_prob(sentence[0], sentence[1], N_vocabs)
            for i in range(2, len(sentence)):
                if n == 2:
                    sum += np.log2(self.cal_2gram_prob(sentence[i-1], sentence[i], N_vocabs))
                elif n == 3:
                    sum += np.log2(self.cal_3gram_prob(sentence[i-2], sentence[i-1], sentence[i], N_vocabs))
        return sum/N_eval
                
    def evaluate_model(self, n, prepared_test_data, N_eval):
        sentences = []
        for title_list in prepared_test_data:
            if len(title_list) <= 5:
                sentence = self.generate_text(n, title_list[:-1])
            else:    
                sentence = self.generate_text(n, title_list[:5])
            sentences.append(sentence)
        return self.average_log_likelihood(prepared_test_data, n, N_eval)

In [43]:
dp_train = DataProcessor(TRAINING_SET_PATH)
dp_train.info()

Number of Unique Tokens:  712
Length of Sentences Average: 13.10


In [44]:
ngram_model = NgramLanguageModel(dp_train.prepared_data)

## Evaluation

In [45]:
dp_test = DataProcessor(TEST_SET_PATH, is_test_set=1)
dp_test.info()

Number of Unique Tokens:  508
Length of Sentences Average: 12.93


In [46]:
test_size = 300

In [47]:
ave_log_likelihood = ngram_model.evaluate_model(2, dp_test.prepared_data[:test_size], dp_test.n_tokens)
print(ave_log_likelihood)

-2.0567642788059604


In [48]:
ave_log_likelihood_3gram = ngram_model.evaluate_model(3, dp_test.prepared_data[:test_size], dp_test.n_tokens)
print(ave_log_likelihood_3gram)

-2.0456603274529344


## Samples of using NgramLanguageModel class methods

In [49]:
completed_by_2gram = ngram_model.generate_text(2, dp_test.prepared_data[0][:5]) # This method return None if there is '\e' at the last of input sentence.
if completed_by_2gram:
    print(completed_by_2gram)
else:
    print("It is a complete sentence.")

['\\s', 'افتتاح', 'N', 'واحد', 'مسکونی', 'در', 'برابر', 'رشادت', 'آنان', 'است', '\\e']


In [50]:
most_prob_vocabs = ngram_model.calculate_smoothed_probs(2, dp_test.prepared_data[0][:-1]) # Because of '\e' character
print(most_prob_vocabs)

[('\\e', 0.05154639175257732), ('سهم', 0.001288659793814433), ('استان', 0.001288659793814433), ('مرکزی', 0.001288659793814433), ('در', 0.001288659793814433)]
