# 1. Болталка

## Импорты

In [1]:
import re
import pickle
import numpy as np

import string
from pymorphy2 import MorphAnalyzer
from stop_words import get_stop_words

from functools import lru_cache
from tqdm.notebook import tqdm

from gensim.models import Word2Vec
import annoy

## Настройки

In [2]:
DATA_PATH = './data/Otvety.txt'
ANSWERS_PATH = './data/prepared_answers.txt'
MODEL_PATH = './models/w2v/model.w2v'
SPEAKER_PATH = './models/w2v/speaker.ann'
INDEX_PATH = './models/w2v/index_map.pkl'

## Чтение, подготовка и обработка данных

In [3]:
import mmap

def get_line_count(file):
    buf = mmap.mmap(file.fileno(), 0)
    lines = 0
    while buf.readline():
        lines += 1

    return lines

In [4]:
morpher = MorphAnalyzer()

@lru_cache(None)
def lemmatize_word(word):
    return morpher.parse(word)[0].normal_form

In [5]:
stop_words = set(get_stop_words("ru"))

def preprocess(text, full_preprocessing=True):    
    text = re.sub(r"<\/?\w+>", " ", text) # HTML теги
    text = re.sub(r"([?.!,])", r" \1 ", text)

    if full_preprocessing:
        text = text.lower() # Ответы не приводим в нижний регистр
        text = re.sub(r"\s+", " ", text) # Двойные пробелы, \n т.д.
        words = [word for word in text.split() if word not in stop_words]
        words = [lemmatize_word(word) for word in words if word]
        words = [word for word in words if len(word) > 2]
    else:
        words = [word for word in text.split()]

    if len(" ".join(words)) > 3:
        return words
    else:
        return None

In [6]:
def prepare_answers(line, written, question):
    if line.startswith('---'):
        written = False

    elif not written and question is not None:
        question = question.replace("\t", " ").strip()
        answer = line.replace("\t", " ").strip()
        
        if words := preprocess(question):
            question = " ".join(words)
        else:
            question = ""

        if words := preprocess(answer, full_preprocessing=False):
            answer = " ".join(words)
        else:
            answer = ""

        return question + "\t" + answer, True, None
        
    elif not written:
        question = line.strip()
    
    return None, written, question

In [7]:
%%time
question = None
written = False
line_count = 0
prepared_answers_count = 0
sentences = []

with open(ANSWERS_PATH, 'w', encoding='utf-8') as fout:
    with open(DATA_PATH, 'r+', encoding='utf-8') as fin:
        line_count = get_line_count(fin)

        for line in tqdm(fin, total=line_count):
            if not line.startswith('---'):
                if sentence := preprocess(line):
                    sentences.append(sentence)

            text, written, question = prepare_answers(line, written, question)
            if text:
                fout.write(text + '\n')
                prepared_answers_count += 1

  0%|          | 0/7550926 [00:00<?, ?it/s]

Wall time: 11min 23s


In [8]:
model = Word2Vec(sentences=sentences, vector_size=100, min_count=1, window=5)
model.save(MODEL_PATH)

In [9]:
index = annoy.AnnoyIndex(100 ,'angular')
index_map = {}

counter = 0
with open(ANSWERS_PATH, "r", encoding='utf-8') as file:
    for line in tqdm(file, total=prepared_answers_count):
        n_w2v = 0
        question, answer = line.split("\t")
        index_map[counter] = answer.replace('\n', ' ').strip()
        vector = np.zeros(100)

        for word in question.split():
            if word in model.wv:
                vector += model.wv[word]
                n_w2v += 1

        if n_w2v > 0:
            vector = vector / n_w2v

        index.add_item(counter, vector)
        counter += 1

index.build(10)
index.save(SPEAKER_PATH)
pickle.dump(index_map, open(INDEX_PATH, 'wb'))

  0%|          | 0/1163342 [00:00<?, ?it/s]

In [10]:
def answer(question):
    question = preprocess(question)

    n_w2v = 0
    vector = np.zeros(100)
    for word in question:
        if word in model.wv:
            vector += model.wv[word]
            n_w2v += 1

    if n_w2v > 0:
        vector = vector / n_w2v

    answer_index = index.get_nns_by_vector(vector, 1)
    return index_map[answer_index[0]]

In [11]:
answer('Как погодка?')

'а У НАС ПРЕКРАСНО , СОЛНЦЕ СВЕТИТ ЯСНО НАМ .'

## Готовим модель

In [12]:
class QAModel:
    def __init__(self):
        self.index_map = pickle.load(open(INDEX_PATH, 'rb'))
        self.model = Word2Vec.load(MODEL_PATH)
        self.index = annoy.AnnoyIndex(100 ,'angular')
        self.index.load(SPEAKER_PATH)
        
        self.morpher = MorphAnalyzer()
        self.stop_words = set(get_stop_words("ru"))

    @lru_cache(None)
    def lemmatize_word(self, word):
        return self.morpher.parse(word)[0].normal_form
    
    def answer(self, question):
        question = self.preprocess(question)

        n_w2v = 0
        vector = np.zeros(100)
        for word in question:
            if word in self.model.wv:
                vector += self.model.wv[word]
                n_w2v += 1

        if n_w2v > 0:
            vector = vector / n_w2v

        answer_index = self.index.get_nns_by_vector(vector, 1)
        return self.index_map[answer_index[0]]

    def preprocess(self, text, full_preprocessing=True):    
        text = re.sub(r"<\/?\w+>", " ", text) # HTML теги
        text = re.sub(r"([?.!,])", r" \1 ", text)

        if full_preprocessing:
            text = text.lower() # Ответы не приводим в нижний регистр
            text = re.sub(r"\s+", " ", text) # Двойные пробелы, \n т.д.
            words = [word for word in text.split() if word not in stop_words]
            words = [lemmatize_word(word) for word in words if word]
            words = [word for word in words if len(word) > 2]
        else:
            [word for word in text.split()]

        if len(" ".join(words)) > 3:
            return words
        else:
            return None

In [13]:
qa_model = QAModel()
qa_model.answer('Как погодка?')

'а У НАС ПРЕКРАСНО , СОЛНЦЕ СВЕТИТ ЯСНО НАМ .'