# Ассистент 1 - LM на основе n-грамм

Перед вами - первое дополнительное задание повышенной сложности, в рамках которого вам предстоит начать разработку телеграм-бота с генеративными моделями.

Цель данного ноутбука - помочь влиться в разработку ассистента. В данном ноутбуке написан код для "обучения" LM на основе n-грамм, для генерации с помощью нее текста, а также сохранение и загрузка модели и токенизатора.

Относитесь к данному заданию максимально творчески - любую часть кода можно менять под ваши нужды и желания, можно оптимизировать, добавлять методы генерации, использовать любые данные, обучать сколь угодно "большую" модель. 

При этом вам стоит быть готовыми со всеми техническими проблеми справляться самому - именно так обычно происходит в реальной жизни в реальных проектах :) 

Поэтому отдельно подчеркну:
* если что-то сломалось после ваших изменений - подразумевается, что вы сами найдете проблему и исправите
* если вы ничего не трогали, но что-то не работает у нас - подразумевается, что вы сами найдете проблему и исправите :) 

Главный критерий выполнености данного задания - телеграм-бот, генерирующий текст и использующий обозначенный в задании подход (в случае данного ноутбука - n-граммная модель в любой ее реализации).

摆在您面前的是第一个复杂度增加的附加任务，您必须利用生成模型开始开发电报机器人。

本手册的目的是帮助您开始开发该助手。在本手册中，我们将编写代码来 "训练 "基于 n-grams 的 LM，用它生成文本，并保存和加载模型和标记符。

您可以优化代码、添加生成方法、使用任何数据、训练任何 "大型 "模型。

与此同时，你还应该准备好自己处理所有的技术问题--这就是现实生活中实际项目中通常发生的情况：) 

因此，我想强调
* 如果在您修改后出现了故障--假定您会发现问题并自行修复。
* 如果您什么都没动，但有些东西对我们不起作用 - 假定您会自己发现问题并解决它:) 

本任务的主要标准是电报机器人生成文本并使用任务中指定的方法（就本笔记本而言--n-gram 模型的任何实现方式）。

_

Для обучения качественной модели вам потребуются датасеты. В ноутбуке составлен маленький игрушечный датасет, вам для улучшения качества потребуется данные в большем количестве и более качественные, а также другие параметры модели и генерации (например, размер контекста побольше). 

С нормальным датасетом и правильными параметрами даже такой простой моделью можно добиться адекватного качества генерации текста (возможно не очень человечный, но вполне связный текст).

Датасеты можно найти и выбрать тут (желательно на русском, вам так будет понятней качество и в целом полезней):
https://huggingface.co/datasets
  
Можете найти наиболее интересный для себя датасет (можете сделать модель как смешной, так и полезной), либо выбрать любой из этих датасетов
* https://huggingface.co/datasets/Den4ikAI/russian_dialogues
* https://huggingface.co/datasets/Georgii/russianPoetry
* https://huggingface.co/datasets/IgorVolochay/russian_jokes

要训练出高质量的模型，您需要数据集。笔记本上有一个小的玩具数据集，要提高质量，需要更多和更好的数据，以及模型和生成的其他参数（例如更大的上下文大小）。

有了正常的数据集和正确的参数，即使是这样一个简单的模型，也能达到足够的文本生成质量（也许不是非常人性化，但相当连贯的文本）。

您可以在这里找到并选择数据集（最好是俄文数据集，这样您可以了解数据集的质量，一般来说也更有用）： https://huggingface.co/datasets。

您可以找到自己最感兴趣的数据集（您可以让模型既有趣又有用），也可以从这些数据集中任选一个。


In [1]:
import re
import pickle 
from itertools import chain
from datetime import datetime
from collections import defaultdict

from typing import List, Dict, Optional, Iterable, Tuple

from tqdm.notebook import tqdm
import numpy as np
import pandas as pd
from datasets import load_dataset
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer

import torch
import torch.nn as nn
from torch.nn import TransformerEncoder, TransformerEncoderLayer

import tokenizers
from tokenizers import Tokenizer
from tokenizers.pre_tokenizers import Whitespace
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.processors import TemplateProcessing

Токенизатор разбивает текст на слова. Можно попробовать другие способы токенизации 标记化器会将文本分解成单词。您可以尝试其他标记化方法

In [2]:
class Tokenizer:
    def __init__(self,
                 token_pattern: str = '\w+|[\!\?\,\.\-\:]',
                 eos_token: str = '<EOS>',
                 pad_token: str = '<PAD>',
                 unk_token: str = '<UNK>'):
        self.token_pattern = token_pattern
        self.eos_token = eos_token
        self.pad_token = pad_token
        self.unk_token = unk_token
        
        self.special_tokens = [self.eos_token, self.pad_token, self.unk_token]
        self.vocab = None
        self.inverse_vocab = None
    
    def text_preprocess(self, input_text: str) -> str:
        """ Предобрабатываем один текст """
        # input_text = ... # приведение к нижнему регистру
        #input_text = input_text.lower()
        input_text = re.sub('\s+', ' ', input_text) # унифицируем пробелы
        input_text = input_text.strip()
        return input_text
    
    def build_vocab(self, corpus: List[str]) -> None:
        assert len(corpus)
        all_tokens = set()
        for text in corpus:
            all_tokens |= set(self._tokenize(text, append_eos_token=False))
        self.vocab = {elem: ind for ind, elem in enumerate(all_tokens)}
        special_tokens = [self.eos_token, self.unk_token, self.pad_token]
        for token in special_tokens:
            self.vocab[token] = len(self.vocab)
        self.inverse_vocab = {ind: elem for elem, ind in self.vocab.items()}
        return self
        
    def _tokenize(self, text: str, append_eos_token: bool = True) -> List[str]:
        text = self.text_preprocess(text)
        tokens = re.findall(self.token_pattern, text)
        if append_eos_token:
            tokens.append(self.eos_token)
        return tokens
    
    def encode(self, text: str, append_eos_token: bool = True) -> List[str]:
        """ Токенизируем текст """
        tokens = self._tokenize(text, append_eos_token)
        ids = [self.vocab.get(token, self.vocab[self.unk_token]) for token in tokens]
        return ids
    
    def decode(self, input_ids: Iterable[int], remove_special_tokens: bool = False) -> str:
        assert len(input_ids)
        assert max(input_ids) < len(self.vocab) and min(input_ids) >= 0
        tokens = []
        for ind in input_ids:
            token = self.inverse_vocab[ind]
            if remove_special_tokens and token in self.special_tokens:
                continue
            tokens.append(token)
        text = ' '.join( tokens )
        return text
    
    def save(self, path: str) -> bool:
        data = {
            'token_pattern': self.token_pattern,
            'eos_token': self.eos_token,
            'pad_token': self.pad_token,
            'unk_token': self.unk_token,
            'special_tokens': self.special_tokens,
            'vocab': self.vocab,
            'inverse_vocab': self.inverse_vocab,
        }
        
        with open(path, 'wb') as fout:
            pickle.dump(data, fout)
            
        return True
        
    def load(self, path: str) -> bool:
        with open(path, 'rb') as fin:
            data = pickle.load(fin)
            
        self.token_pattern = data['token_pattern']
        self.eos_token = data['eos_token']
        self.pad_token = data['pad_token']
        self.unk_token = data['unk_token']
        self.special_tokens = data['special_tokens']
        self.vocab = data['vocab']
        self.inverse_vocab = data['inverse_vocab']

Класс для задания параметров генерации, так удобней писать логику для валидации параметров и разные другие доп методы 用于设置生成参数的类，因此更便于编写参数验证逻辑和其他各种附加方法

In [3]:
class GenerationConfig:
    def __init__(self, **kwargs):
        """
        Тут можно задать любые параметры и их значения по умолчанию
        Значения для стратегии декодирования decoding_strategy: ['max', 'top-p']
        您可以在此设置任何参数及其默认值
        解码策略的值 decoding_strategy：['max', 'top-p']
        """
        self.temperature = kwargs.pop("temperature", 0.4)
        self.max_tokens = kwargs.pop("max_tokens", 32)
        self.sample_top_p = kwargs.pop("sample_top_p", 0.3)
        self.decoding_strategy = kwargs.pop("decoding_strategy", 'top-p')
        self.remove_special_tokens = kwargs.pop("remove_special_tokens", True)
        self.validate()
        
    def validate(self):
        """ Здесь можно валидировать параметры """
        if not (1.0 > self.sample_top_p > 0):
            raise ValueError('sample_top_p')
        if self.decoding_strategy not in ['max', 'top-p']:
            raise ValueError('decoding_strategy')

Сама LM на основе n-грамм. Тут используется сглаживание Лапласа (можно поменять на метод backoff при желании), а также есть ряд параметров, сильно влияющий на качество генерации. Один из параметров генерации - стратегия генерации. 

Когда мы получили вероятности для следующего токена, мы по этим вероятностям хотим выбрать этот следующий токен.

Можно просто семплировать из этого распределения - но тогда есть шанс, что будут очень маловероятные токены.

Можно брать самый вероятный токен - но это плохо повлияет на разнообразие и "человечность" языка

Можно воспользовать подходом top-p - семплировать только из тех токенов, которые наиболее вероятны (их вероятности суммируются в заданный p)

Можно проверить, что top-p будет генерировать более интересный текст чем max

Также обратите внимание на параметр температуры. В случае top-p и семплирования, чем больше делаешь температуру, тем меньше отличаются друг от друга вероятности (распределение стремится к равномерному, даже если исходное распределение имело вполне себе выраженные максимумы), и текст становится более случайным (и разнообразным)

LM 本身是基于 n-grams 的。它使用拉普拉斯平滑法（如果需要，也可以改为后退法），有许多参数对生成质量有很大影响。其中一个生成参数就是生成策略。

一旦得到下一个标记的概率，我们就可以使用这些概率来选择下一个标记。

我们可以简单地从这个分布中抽样，但这样就有可能出现不太可能出现的标记。

我们可以选取最有可能的标记--但这会对语言的多样性和 "人性化 "产生不良影响。

您可以使用 top-p 方法--只对最有可能出现的标记取样（它们的概率总和为给定的 p）。

您可以检查一下 top-p 会比 max 产生更多有趣的文本。

还要注意温度参数。在 top-p 和采样的情况下，温度越高，概率之间的差异就越小（分布趋于均匀，即使原始分布有相当明显的最大值），文本就会变得更加随机（和多样化）。

In [4]:
class StatLM:
    def __init__(self, 
                 tokenizer: Tokenizer,
                 context_size: int = 2,
                 alpha: float = 0.1
                ):
        
        assert context_size >= 1
        self.context_size = context_size
        self.tokenizer = tokenizer
        self.alpha = alpha
        
        self.n_gramms_stat = defaultdict(int)
        self.nx_gramms_stat = defaultdict(int)
    

    def get_token_by_ind(ind: int) -> str:
        return self.tokenizer.vocab.get(ind)
    
    def get_ind_by_token(token: str) -> int:
        return self.tokenizer.inverse_vocab.get(token, self.tokenizer.inverse_vocab[self.unk_token])
        
    def train(self, train_texts: List[str]):
        for sentence in tqdm(train_texts, desc='train lines'):
            sentence_ind = self.tokenizer.encode(sentence)
            for i in range(len(sentence_ind) - self.context_size):
                
                seq = tuple(sentence_ind[i: i + self.context_size - 1])
                self.n_gramms_stat[seq] += 1
                
                seq_x = tuple(sentence_ind[i: i + self.context_size])
                self.nx_gramms_stat[seq_x] += 1
                
            seq = tuple(sentence_ind[len(sentence_ind) - self.context_size:])
            self.n_gramms_stat[seq] += 1
    
            # Я добавлю сюда метод Backoff
            for ngram in self.nx_gramms_stat.keys():
                n_1gram = ngram[:-1]  # Get the (N-1)-gram
                if n_1gram not in self.n_gramms_stat:
                    # Back-off to lower-order n-gram
                    self.nx_gramms_stat[ngram] = self.nx_gramms_stat[ngram[:-1]]
            
    def sample_token(self, 
                     token_distribution: np.ndarray,
                     generation_config: GenerationConfig) -> int:
        if generation_config.decoding_strategy == 'max':
            return token_distribution.argmax()
        elif generation_config.decoding_strategy == 'top-p':
            token_distribution = sorted(list(zip(token_distribution, np.arange(len(token_distribution)))),
                                        reverse=True)
            total_proba = 0.0
            tokens_to_sample = []
            tokens_probas = []
            for token_proba, ind in token_distribution:
                tokens_to_sample.append(ind)
                tokens_probas.append(token_proba)
                total_proba += token_proba
                if total_proba >= generation_config.sample_top_p:
                    break
            # для простоты отнормируем вероятности, чтобы суммировались в единицу
            tokens_probas = np.array(tokens_probas) / generation_config.temperature
            tokens_probas = tokens_probas / tokens_probas.sum()
            return np.random.choice(tokens_to_sample, p=tokens_probas)
        else:
            raise ValueError(f'Unknown decoding strategy: {generation_config.decoding_strategy}')
            
    def save_stat(self, path: str) -> bool:
        stat = {
            'n_gramms_stat': self.n_gramms_stat,
            'nx_gramms_stat': self.nx_gramms_stat,
            'context_size': self.context_size,
            'alpha': self.alpha
        }
        with open(path, 'wb') as fout:
            pickle.dump(stat, fout)
            
        return True
    
    def load_stat(self, path: str) -> bool:
        with open(path, 'rb') as fin:
            stat = pickle.load(fin)
            
        self.n_gramms_stat = stat['n_gramms_stat']
        self.nx_gramms_stat = stat['nx_gramms_stat']
        self.context_size = stat['context_size']
        self.alpha = stat['alpha']
            
        return True
        
    def get_stat(self) -> Dict[str, Dict]:
        
        n_token_stat, nx_token_stat = {}, {}
        for token_inds, count in self.n_gramms_stat.items():
            n_token_stat[self.tokenizer.decode(token_inds)] = count
        
        for token_inds, count in self.nx_gramms_stat.items():
            nx_token_stat[self.tokenizer.decode(token_inds)] = count
        
        return {
            'n gramms stat': self.n_gramms_stat,
            'n+1 gramms stat': self.nx_gramms_stat,
            'n tokens stat': n_token_stat,
            'n+1 tokens stat': nx_token_stat,
        }
    
    def _get_next_token(self, 
                        tokens: List[int],
                        generation_config: GenerationConfig) -> (int, str):
        denominator = self.n_gramms_stat.get(tuple(tokens), 0) + self.alpha * len(self.tokenizer.vocab)
        numerators = []
        for ind in self.tokenizer.inverse_vocab:
            numerators.append(self.nx_gramms_stat.get(tuple(tokens + [ind]), 0) + self.alpha)
        
        token_distribution = np.array(numerators) / denominator
        max_proba_ind = self.sample_token(token_distribution, generation_config)
        
        next_token = self.tokenizer.inverse_vocab[max_proba_ind]
        
        return max_proba_ind, next_token
            
    def generate_token(self, 
                       text: str, 
                       generation_config: GenerationConfig
                      ) -> Dict:
        tokens = self.tokenizer.encode(text, append_eos_token=False) 
        tokens = tokens[-self.context_size + 1:]
        
        max_proba_ind, next_token = self._get_next_token(tokens, generation_config)
        
        return {
            'next_token': next_token,
            'next_token_num': max_proba_ind,
        }
    
    
    def generate_text(self, text: str, 
                      generation_config: GenerationConfig
                     ) -> Dict:
        
        all_tokens = self.tokenizer.encode(text, append_eos_token=False) 
        tokens = all_tokens[-self.context_size + 1:]
        next_token = None
        while next_token != self.tokenizer.eos_token and len(all_tokens) < generation_config.max_tokens:   
            max_proba_ind, next_token = self._get_next_token(tokens, generation_config)
            #print('max_proba_ind, next_token:',max_proba_ind, next_token)
            all_tokens.append(max_proba_ind)
            tokens = all_tokens[-self.context_size + 1:]
        #print('tokenssssss2222222s:',tokens)
        new_text = self.tokenizer.decode(all_tokens, generation_config.remove_special_tokens)
        #print('new_text~~:',new_text)
        finish_reason = 'max tokens'
        if all_tokens[-1] == self.tokenizer.vocab[self.tokenizer.eos_token]:
            finish_reason = 'end of text'
        
        return {
            'all_tokens': all_tokens,
            'total_text': new_text,
            'finish_reason': finish_reason
        }
    
    def generate(self, text: str, generation_config: Dict) -> str:
        return self.generate_text(text, generation_config)['total_text']

In [5]:
#загружаем набор данных
from datasets import load_dataset
#dataset_jokes = load_dataset("IgorVolochay/russian_jokes")
dataset_dial = load_dataset("Den4ikAI/russian_dialogues")

In [6]:
print(dataset_dial)
#print(dataset_jokes)
print(dataset_dial["train"]['question'][0])
#print(dataset_jokes["train"]['text'][0])

DatasetDict({
    train: Dataset({
        features: ['question', 'answer', 'relevance'],
        num_rows: 2477321
    })
})
как дела?


In [7]:
import string
texts = []
questions = dataset_dial["train"]["question"]
answers = dataset_dial["train"]["answer"]
punctuation_set = set(string.punctuation)
for question, answer in zip(questions, answers):
    if answer is not None and question is not None:  
        question = question.lower()
        answer = answer.lower()
        texts.append(question + "  " + answer)
#соединили вопросы и ответы из диалога
print(texts[:5])  
print(len(texts))

['как дела?  там хорошо', 'вы кефир пачему не кушаете, не любите?  я ряженку лучше люблю.', 'если в расходную накладную забить дури и выкурить, то получится приходный документ?  особенно когда придет комиссия проверять документацию', 'покажись в шапке  ды щаз приветик', 'давай не будем об этом  давай поговорим о чем-нибудь другом']
2476083


In [8]:
import random
# Поскольку набор данных слишком велик, время обучения модели будет очень большим, 
# поэтому мы случайным образом выбираем только 0,02 из набора данных.
percentage_to_select = 0.02
num_samples_to_select = int(len(texts) * percentage_to_select)
texts_s = random.sample(texts, num_samples_to_select)
print(len(texts_s))
#Поскольку алгоритм ngram делает выводы, вычисляя вероятность между N-ым словом и предыдущими N-1.
# Поэтому, чтобы модель могла отвечать на вопросы типа "Кто ты?" "Я ассистент llm", необходимо убедиться, 
# что набор данных "кто ты? я ассистент llm ." встречается достаточное количество раз.

for i in range(5000):
    texts_s.append('кто ты ? я ассистент llm . ')
print(len(texts_s))

49521
54521


In [9]:
from collections import Counter
#Давайте подсчитаем, какие слова встречаются в наборе данных с высокой частотой
def find_most_common_words(data, top_n=5):
    word_counter = Counter()
    for text in data:
        words = text.lower().split()  
        word_counter.update(words)
    most_common_words = word_counter.most_common(top_n)
    return most_common_words

top_10_words = find_most_common_words(texts_s, top_n=10)
print("Top 10 most common words:")
for word, count in top_10_words:
    print(f"Word: {word}, Count: {count}")

Top 10 most common words:
Word: и, Count: 23146
Word: не, Count: 22323
Word: в, Count: 19748
Word: а, Count: 15410
Word: что, Count: 14950
Word: я, Count: 14473
Word: на, Count: 10745
Word: как, Count: 10570
Word: ты, Count: 9768
Word: это, Count: 8279


Эта функция напрямую используется в телеграм боте для получения модели и конфига генерации 该功能直接用于电报机器人，以获取模型和配置生成信息

### Обучаем на игрушечных данных 从玩具数据中学习

Для демонстрации того, что происходит, возьмем несколько коротких цитат Джейсона Стэтхема отсюда: 为了说明问题所在，让我们来引用几段杰森-斯坦森（Jason Statham）的简短语录：

https://dzen.ru/a/ZRFaGN_gKhX6xTWW

In [10]:
generation_config = GenerationConfig(temperature = 0.2, max_tokens = 32,
                                     sample_top_p = 0.3, decoding_strategy = 'top-p',
                                     remove_special_tokens=True)
#Поскольку это 3-граммовая модель, генерация нестабильна, поэтому температура и sample_top_p настроены очень низко

In [11]:
tokenizer = Tokenizer().build_vocab(texts_s)

In [12]:
dict(list(tokenizer.vocab.items())[:5])

{'проживая': 0, 'шрамом': 1, 'кредитные': 2, 'занимался': 3, 'уральские': 4}

In [13]:
punctuation_dict = {
    ':': 0,
    '.': 1,
    '!': 2,
    '?': 3,
    ';': 4,
    ',': 5,
    '-': 5,
}
#Когда я уже много раз пробовал тренироваться, модель никогда не генерировала знаки препинания
# я думал, что это происходит потому, что знаки препинания не хранятся в списке слов tokenizer.vocab.items(), 
# поэтому вот запрос для всех идентификаторов знаков препинания
def contains_punctuation(vocabulary):
    for word,_ in vocabulary:
        if not word.isalnum():  
            punctuation_dict[word]=tokenizer.vocab[word]
            # print(f"знак препинания'{word}' в списке слов ")
            # print(tokenizer.vocab[word])
    return False

contains_punctuation(list(tokenizer.vocab.items()))
print(punctuation_dict)
#убедились, что знак препинания в списке слов tokenizer.vocab.items()

{':': 51372, '.': 33071, '!': 22875, '?': 72083, ';': 4, ',': 64998, '-': 20643, '<EOS>': 88165, '<UNK>': 88166, '<PAD>': 88167}


In [14]:
# класс, который позволяем строить и использовать языковую модель на основе n-грамм
#类，使我们能够建立和使用基于 n-grams 的语言模型
stat_lm = StatLM(tokenizer, context_size=3, alpha=0.3) # , sample_top_p = None

stat_lm.train(texts_s)

train lines:   0%|          | 0/54521 [00:00<?, ?it/s]

In [22]:
print(stat_lm.generate("кто ты ? ", generation_config))

кто ты ? я ассистент llm . подобен письме самозащиты москвеобл занимательно щука счетное магаз обеляет привычным обижайте вникуда свидетельстве своенравные рпкоманда экспериментов жизненного излечим кисленький беззаботное изгоя покончат яхвисты польются подозреваю


In [17]:
tokenizer.save('models/stat_lm/tokenizer.pkl')
stat_lm.save_stat('models/stat_lm/stat_lm.pkl')

True

Тут мы для токенизатора сохраняем только спецтокены и словарь, для модели - параметры и статистики n-грамм и n+1-грамм. Потом в телеграм боте подгружаем именно эти параметры

在这里，我们只为标记符号生成器保存特殊标记符号和字典，为模型保存 n-grams 和 n+1-grams 的参数和统计数据。然后，我们将这些参数加载到 Telegram 机器人中

Когда обучите модель на большом датасете, советую посмотреть на распределение вероятностей для следующего слова при разных входах 当您在大型数据集上训练模型时，我建议您查看以下单词在不同输入情况下的概率分布

### смотрим как конструировать 了解如何构建

In [18]:
def construct_model():
    config = {
        'temperature': 0.3,
        'max_tokens': 32,
        'sample_top_p': 0.3,
        'decoding_strategy': 'top-p',
    }

    stat_lm_path = 'models/stat_lm/stat_lm.pkl'
    tokenizer_path = 'models/stat_lm/tokenizer.pkl'
    
    tokenizer = Tokenizer()
    tokenizer.load(tokenizer_path)
        
    stat_lm = StatLM(tokenizer)
    stat_lm.load_stat(stat_lm_path)

    generation_config = GenerationConfig(temperature=config['temperature'],
                                         max_tokens=config['max_tokens'],
                                         sample_top_p=config['sample_top_p'],
                                         decoding_strategy=config['decoding_strategy'],
                                         remove_special_tokens=True)

    kwargs = {'generation_config': generation_config}
    print(kwargs)
    return stat_lm, kwargs

In [19]:
model, kwargs = construct_model()

{'generation_config': <__main__.GenerationConfig object at 0x00000169DE9B6980>}


In [24]:
model.generate("кто ты? ", **kwargs)

'кто ты ? я ассистент розмарин черчилль админом негативном причешут мозгу продвижения материнский позвольте облагаться ромбоваться ладнобудем юпитерианкой русопетовки водовка маршрутке предсказание печалят смазливых дурость всяким трещу пояснения расслаблялся полевых вошла валяюсь'