# Генерация текста с помощью модели GPT

Модель: sberbank-ai/rugpt3medium_based_on_gpt2

In [None]:
# !pip install transformers

In [None]:
from transformers import GPT2LMHeadModel, AdamW
import numpy as np
import pandas as pd
import re
import random
import textwrap

import torch
from tqdm.notebook import tqdm
import transformers

if torch.cuda.is_available():    
    device = torch.device("cuda")
else:
    device = torch.device("cpu")


In [None]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('sberbank-ai/rugpt3medium_based_on_gpt2')

In [None]:
# путь к файлу с текстом
PATH_TEXT = ''

In [None]:
with open(PATH_TEXT, encoding='cp1251') as f:
    text = f.read()

# text = re.sub('\n{2,}', '\n', text)
# text = re.sub('[^A-Za-z0-9]+', '', text)
# text = re.sub("\d+", "", text)
# text = re.sub(r'[^\w]', ' ', text)
# text = text.replace('=','')
# text = text.replace('[','')
# text = text.replace(']','')
# text = text.replace('"','')
# text = text.replace("'",'')
# text = text.replace('  ',' ')

# print(text[:1000])

In [None]:
tokens = tokenizer.encode(text, add_special_tokens=True)
tokens = np.array(tokens)

In [None]:
l = len(tokens)//15
train = []
test = []
for i in range(15):
    if i%5 > 0:
        train.extend(tokens[i*l: (i+1)*l])
    else:
        test.extend(tokens[i*l: (i+1)*l])
train = np.array(train)
test = np.array(test)

print(len(tokens), len(train), len(test))

In [None]:
model = GPT2LMHeadModel.from_pretrained(
    'sberbank-ai/rugpt3medium_based_on_gpt2',
    output_attentions = False,
    output_hidden_states = False,
    )

model.to(device)

In [None]:
batch_size = 2
max_len = 256
epochs = 7

n_train = len(train)//(batch_size*max_len)
n_test = len(test)//(batch_size*max_len)
print(n_train, n_test)

optimizer = AdamW(model.parameters(), lr = 1e-5, eps = 1e-8)

total_steps = n_train * epochs
scheduler = transformers.get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0, # Default value in run_glue.py
                                            num_training_steps = total_steps)


def accuracy(y_true, logits):
    return torch.mean((y_true[1:] == torch.argmax(logits, dim=2)[:-1]).float()).detach().cpu().numpy()

In [None]:
def prep_tensors(x, i, batch_size=batch_size, max_len=max_len):
    batch_ids = x[i*batch_size*max_len: (i+1)*batch_size*max_len]
    batch_ids = batch_ids.reshape(batch_size, max_len)
    batch_ids = torch.tensor(batch_ids).to(device)
    return batch_ids


for epoch in range(1, epochs+1):
    print(f'epoch {epoch}/{epochs} : training')

    train_loss = []
    train_acc = []
    model.train()
    pbar = tqdm(range(n_train))
    for i in pbar:
        batch_ids = prep_tensors(train, i)

        model.zero_grad()
        loss, logits, _ = model(batch_ids,
                             token_type_ids=None, 
                             labels=batch_ids
                             ).values()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        
        train_loss.append(loss.item())
        train_acc.append(accuracy(batch_ids, logits))
        pbar.set_description(f'acc {np.mean(train_acc):.4f} loss {np.mean(train_loss):.4f}', refresh=True)

    
    print(f'epoch {epoch}/{epochs} : validation')
    model.eval()
    val_acc = []
    val_loss = []
    pbar = tqdm(range(n_test))
    for i in pbar:
        batch_ids = prep_tensors(test, i)
        with torch.no_grad():        
            loss, logits, _ = model(batch_ids, 
                                token_type_ids=None, 
                                labels=batch_ids
                                 ).values()
        
        val_loss.append(loss.item())
        val_acc.append(accuracy(batch_ids, logits))
        pbar.set_description(f'acc {np.mean(val_acc):.4f} loss {np.mean(val_loss):.4f}', refresh=True)


In [None]:
def generate(prompt, len_gen=20, temperature=1):
    generated = tokenizer.encode(prompt)
    context = torch.tensor([generated]).to(device)
    past = None

    for i in tqdm(range(len_gen)):
        output, past = model(context, past_key_values=past).values()
        output = output / temperature
        token = torch.distributions.Categorical(logits=output[..., -1, :]).sample()
        
        generated += token.tolist()
        context = token.unsqueeze(0)

    sequence = tokenizer.decode(generated)

    return sequence

In [None]:
def gtp_space(text):
  prompt = text
  prompt = tokenizer.encode(prompt, return_tensors='pt').to(device)
  out = model.generate(
      input_ids=prompt,
      max_length=200,
      num_beams=6,
      do_sample=True,
      temperature=1.,
      top_k=50,
      top_p=0.7,
      no_repeat_ngram_size=4,
      num_return_sequences=1,
      ).cpu().numpy()
  for out_ in out:
      text_gen = textwrap.fill(tokenizer.decode(out_), 120)
  text_gen = text_gen.replace('\xa0',' ')
  text_gen = text_gen.replace('\n',' ')
  text_gen = text_gen[:text_gen.rfind('.')+1]
  text_gen = re.sub(r"(\.\s+|^)(\w+)",
                  lambda m: m.group(1) + m.group(2).capitalize(),
                  text_gen)
  return text_gen

In [None]:
text = 'Снизошел'
gtp_space(text)

In [None]:
PATH = 'modelpop.pt'
torch.save(model.state_dict(), PATH)

## Inference

In [None]:
PATH='modelpop.pkl'

In [None]:
import torch
import textwrap
import numpy as np
import re
import textwrap
from transformers import GPT2LMHeadModel, AdamW
from transformers import GPT2Tokenizer

# device = 'cpu'
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = GPT2LMHeadModel.from_pretrained(
    'sberbank-ai/rugpt3medium_based_on_gpt2',
    output_attentions = False,
    output_hidden_states = False,
    state_dict=torch.load(PATH, map_location=torch.device(device))
)
model.to(device)

#Load the model & tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('sberbank-ai/rugpt3medium_based_on_gpt2')

def generate(prompt, len_gen=20, temperature=1):
    generated = tokenizer.encode(prompt)
    context = torch.tensor([generated]).to(device)
    past = None

    for i in range(len_gen):
        output, past = model(context, past_key_values=past).values()
        output = output / temperature
        token = torch.distributions.Categorical(logits=output[..., -1, :]).sample()
        generated += token.tolist()
        context = token.unsqueeze(0)

    sequence = tokenizer.decode(generated)
    return sequence

def gtp_space(text):
    prompt = text
    prompt = tokenizer.encode(prompt, return_tensors='pt').to(device)
    with torch.no_grad():
      out = model.generate(input_ids=prompt,
          max_length=200,
          num_beams=6,
          do_sample=True,
          temperature=1.,
          top_k=50,
          top_p=0.7,
          no_repeat_ngram_size=4,
          num_return_sequences=1,
          ).cpu().numpy()
    for out_ in out:
        text_gen = textwrap.fill(tokenizer.decode(out_), 120)
        text_gen = text_gen.replace('\xa0',' ')
        text_gen = text_gen.replace('\n',' ')
        text_gen = text_gen[:text_gen.rfind('.')+1]
        text_gen = re.sub(r"(\.\s+|^)(\w+)", lambda m: m.group(1) + m.group(2).capitalize(), text_gen)
    return text_gen


In [None]:
text = 'Послание'
gtp_space(text)

## Вариант локального бота для телеги

In [None]:
import nest_asyncio
nest_asyncio.apply()
from pathlib import Path
import os
import time

import numpy as np
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher
from aiogram.utils import executor
from aiogram.types import ContentType, File, Message
from bs4 import BeautifulSoup
from subprocess import call
import requests

In [None]:
TOKEN = "" # Токен для бота (получаем через @BotFather)

os.getenv("TOKEN")

# Инициализация бота
bot = Bot(token=TOKEN)
dp = Dispatcher(bot)

@dp.message_handler(commands=['start'])
async def start(message: types.Message):
    user_name = message.from_user.full_name
    user_id = message.from_user.id
    await message.reply(f'Hello {user_name}! Your user_id = {user_id}')

@dp.message_handler()
async def echo(message: types.Message):
    query = message.text
    text_gen = gtp_space(query)
    await message.reply(text_gen)

# Команда запуска бота
if __name__ == '__main__':
    executor.start_polling(dp)