# Реализация n-gram языковой модели

In [188]:
import os
import sqlite3
import re
import tqdm
from scipy.optimize import differential_evolution
import random

In [2]:
class DBNews:
    '''Чтение текстов нововстей из базы'''

    def __init__(self):
        self.conn = sqlite3.connect("news.db")
        self.cursor = self.conn.cursor()
        
    def get_texts(self):
        val = self.cursor.execute('SELECT title_news || " " || text_news FROM news')
        return(val)
    
    def get_text(self, id_news):
        val = self.cursor.execute('SELECT title_news, text_news FROM news WHERE id=?', (id_news,))
        return(val.fetchone())

#### Формирование дерева

Многоуровневая модель с вариативной длиной n-граммов (максимум 3-граммы)

In [68]:
class NGrammFit:
    '''Формирование дерева n-gramm на основании текстов'''
    
    def __init__(self, obj=None):
        if obj is None: self._fit()
        else: self.n_gram = obj.n_gram
            
    def _fit(self):
        news = DBNews().get_texts()
        self.n_gram={}
        for n in tqdm.tqdm_notebook(news):
            text = n[0]
            if len(text)<5: continue
            text = self._text_clear(text)
            text_arr = text.split(' ')
            self._set_text(text_arr)   
        self._calculate_proba(self.n_gram)
        
    def _text_clear(self, text_in):
        text_out = text_in.lower()
        text_out = re.compile('[^a-zа-я ё.]').sub('', text_out)
        text_out = re.sub(r'\s+', ' ', text_out)
        if text_out[-1]=='.': text_out = text_out[:-1] + ' <K>' 
        else: text_out = text_out + ' <K>' 
        text_out = '<N> ' + text_out.replace(', ', ' ,').replace('.', ' <P> ').replace('  ', ' ')
        text_out = text_out.replace('<N>', '<S>').replace('<P>', '<S>').replace('<K>', '<S>')
        return text_out
    
    def _set_text(self, text_arr):
        for i in range(len(text_arr)-2):
            self._recursive_set(self.n_gram, text_arr[i:i+3])
        self._recursive_set(self.n_gram, text_arr[i+1:i+3])

    def _recursive_set(self, n_gram, words):
        if not words[0] in n_gram: n_gram[words[0]] = {'VAL':0}
        n_gram = n_gram[words[0]]
        n_gram['VAL'] += 1
        if len(words)>1: self._recursive_set(n_gram, words[1:])
    
    def _calculate_proba(self, n_gram):
        count_child = 0
        sum_val_child = 0
        for k, v in n_gram.items():
            if k=='VAL': continue
            sum_val_child += v['VAL']
            count_child += 1
            if len(n_gram[k])>1:
                self._calculate_proba(n_gram[k])
        n_gram['SUM_VAL_CHILD'] = sum_val_child
        n_gram['COUNT_CHILD'] = count_child
                
    def _get_min_proba(self):
        sum_val_child = 0
        count_child = 1
        for k, v in self.n_gram.items():
            if k=='VAL' or k=='SUM_VAL_CHILD' or k=='COUNT_CHILD': continue
            sum_val_child += v['VAL']
            count_child += 1
        return 1/(sum_val_child+count_child)

In [69]:
n_gramm = NGrammFit()

HBox(children=(IntProgress(value=1, bar_style='info', max=1), HTML(value='')))




#### Задание №1 Предсказание вероятности входного предложения

- Применено сглаживание по методу откат: переход к с более низким n, для которых достаточно данных (3-2 грамм)
- Применяется сглаживание Лапласа - если для биграммы нет 2 слова оно считается встречающимся 1 раз
- Если слова из фразы вообще нет в словаре, то считается, что оно встречалось 1 раз

In [71]:
class Task1(NGrammFit):
    
    def __init__(self, obj=None):
        super().__init__(obj)
    
    def get_text_proba(self, text, show=False):
        text = '<S> '+text+' <S>'
        text_arr = text.split(' ')
        i, proba = 0, 1
        while i<len(text_arr)-1:
            proba_val = self._get_proba(text_arr[i:])
            if proba_val:
                j, t_proba = proba_val
                C = text_arr[i:i+j] # для показа
                i += j-1
            else:
                C = text_arr[i] # для показа
                t_proba = self._get_min_proba()
                i+=1
            if show: print(C, round(t_proba,5)) # для показа
            proba *= t_proba
        return proba
    
    def _get_proba(self, token):
        n_gram_local = self.n_gram
        i, proba = 0, 0
        a_laplas = 1
        for word in token:
            if word in n_gram_local:
                i+=1
                val = n_gram_local[word]['VAL']
                sum_val = n_gram_local['SUM_VAL_CHILD']
                count = n_gram_local['COUNT_CHILD']+1
                proba = (val+a_laplas)/(sum_val+count*a_laplas)
                n_gram_local = n_gram_local[word]
            else: break
        if i==0: return False
        if i==1:
            sum_val = n_gram_local['SUM_VAL_CHILD']
            count = n_gram_local['COUNT_CHILD']
            proba = a_laplas/(sum_val+count*a_laplas)
            i+=1
        return i, proba

In [72]:
obj_task_1 = Task1(n_gramm)

In [73]:
obj_task_1.get_text_proba('на совещании с главами районов', True)

['<S>', 'на', 'совещании'] 0.00288
['совещании', 'с', 'главами'] 0.11538
['главами', 'районов'] 0.06667
['районов', '<S>'] 0.06393


1.4144171540512444e-06

#### Задание №2 Предсказание наиболее вероятных пар ко входному слову

In [219]:
class Task2(Task1):
    
    def __init__(self, obj=None):
        super().__init__(obj)
        
    def get_couples(self, word, count_var=5):
        if not word in self.n_gram: return False
        self.not_in = {'VAL', 'SUM_VAL_CHILD', 'COUNT_CHILD'}
        couples = self._get_2gram(self.n_gram[word], count_var)
        couples.extend(self._get_3gram(self.n_gram[word], count_var))
        return couples
    
    def _get_2gram(self, local_n_gram, count_var):
        answer = sorted(local_n_gram, reverse=True, 
                        key=lambda x: local_n_gram[x]['VAL'] if not x in self.not_in else 0)
        for ni in self.not_in:
            if ni in answer:
                answer.remove(ni)
        return answer[:count_var]
    
    def _get_3gram(self, local_n_gram, count_var):
        answer = {}
        for k,v in local_n_gram.items():
            if k in self.not_in: continue
            next_words = self._get_2gram(v, count_var)
            for nw in next_words:
                key = k+' '+nw
                val = v[nw]['VAL']
                answer.update({key:val})
        answer = sorted(answer, reverse=True, key=lambda x: answer[x])
        return answer[:count_var]
            
    

In [220]:
obj_task_2 = Task2(n_gramm)

In [266]:
obj_task_2.get_couples('российской')

['федерации',
 'стороны',
 'академии',
 'столице',
 'премьерлиги',
 'федерации <S>',
 'академии наук',
 'премьерлиги рпл',
 'федерации в',
 'федерации и']

#### Задание №3 Продолжение входной фразы словами до заданной длины

In [229]:
class Task3(Task2):
    
    def __init__(self, obj=None):
        super().__init__(obj)
        
    def get_continue(self, begin, len_continue):
        text_arr = begin.split(' ')
        if len(text_arr)>=len_continue: return begin
        word = text_arr[-1]
        next_text = self._get_next_text(word)
        begin += ' '+next_text
        return self.get_continue(begin, len_continue)
    
    def _get_next_text(self, word):
        next_text = self.get_couples(word, 5)
        return random.choice(next_text)

In [230]:
obj_task_3 = Task3(n_gramm)

In [289]:
obj_task_3.get_continue('на совещании с главами районов', 15)

'на совещании с главами районов приморского края на борту самолета находились человек включая членов экипажа потерпел'

#### Задание №3 Продолжение входной фразы словами до заданной длины
С помощью генетического алгоритма (неудачный вариант)

In [159]:
class Task3_(Task2):
    
    def __init__(self, obj=None):
        super().__init__(obj)
        self.id_words = sorted(self.n_gram.keys())
        self.id_words.remove('SUM_VAL_CHILD')
        self.id_words.remove('COUNT_CHILD')
        
    def get_continue(self, begin, len_continue):
        max_id = len(self.id_words)-1
        bounds = []
        for i in range(len_continue):
            bounds.append((0, max_id))
        result = differential_evolution(self._fitnes_evolution, bounds, args=(begin,))
        text = begin + self._get_text_in_id(result['x'])
        return text, result

    def _fitnes_evolution(self, parameters, *data):
        begin = data[0]
        text = begin + self._get_text_in_id(parameters)
        try:
            proba = self.get_text_proba(text)
        except Exception as e:
            print(text)
            raise
        
        return (1/proba)**len(text.split(' '))
    
    def _get_text_in_id(self, list_id):
        text = ''
        for id_w in list_id:
            word = self.id_words[int(id_w)]
            text += ' '+word
        return text

In [160]:
obj_task_3_ = Task3_(n_gramm)

In [161]:
obj_task_3_.get_continue('на совещании с главами районов', 5)

('на совещании с главами районов хаманият карлесом плотностью млн куб',
      fun: 5.732463875227128e+79
  message: 'Optimization terminated successfully.'
     nfev: 19656
      nit: 261
  success: True
        x: array([136024.98703711,  49383.64879731,  88010.30165997,  64604.40364974,
         56337.74710221]))