# Text generation with **GPT**
Модель: sberbank-ai/rugpt3medium_based_on_gpt2

In [None]:
!pip install transformers

In [None]:
import numpy as np
import pandas as pd
import re
import random

import torch
from tqdm.notebook import tqdm
import transformers

import re
import textwrap

from transformers import GPT2LMHeadModel, AdamW


PATH_TEXT = 'panorama.txt'

if torch.cuda.is_available():    
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [None]:
from transformers import GPT2Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained('sberbank-ai/rugpt3small_based_on_gpt2')

In [None]:
with open(PATH_TEXT, encoding='utf8') as f:
    text = f.read()

text = re.sub('\n{2,}', '\n', text)
print(text[:400])

In [None]:
tokens = tokenizer.encode(text, add_special_tokens=True)
tokens = np.array(tokens)

In [None]:
l = len(tokens)//15
train = []
test = []
for i in range(15):
    if i%5 > 0:
        train.extend(tokens[i*l: (i+1)*l])
    else:
        test.extend(tokens[i*l: (i+1)*l])
train = np.array(train)
test = np.array(test)

print(len(tokens), len(train), len(test))

In [None]:
# download rugpt3small_based_on_gpt2 model ~526 mb
model = GPT2LMHeadModel.from_pretrained(
    'sberbank-ai/rugpt3small_based_on_gpt2',
    output_attentions = False,
    output_hidden_states = False,
)

model.to(device)

In [None]:
batch_size = 8
max_len = 256
epochs = 5

n_train = len(train)//(batch_size*max_len)
n_test = len(test)//(batch_size*max_len)
print(n_train, n_test)

optimizer = torch.optim.AdamW(model.parameters(), lr = 1e-5, eps = 1e-8)

total_steps = n_train * epochs
scheduler = transformers.get_linear_schedule_with_warmup(optimizer, 
                                            num_warmup_steps = 0, # Default value in run_glue.py
                                            num_training_steps = total_steps)


def accuracy(y_true, logits):
    return torch.mean((y_true[1:] == torch.argmax(logits, dim=2)[:-1]).float()).detach().cpu().numpy()

In [None]:
def prep_tensors(x, i, batch_size=batch_size, max_len=max_len):
    batch_ids = x[i*batch_size*max_len: (i+1)*batch_size*max_len]
    batch_ids = batch_ids.reshape(batch_size, max_len)
    batch_ids = torch.tensor(batch_ids).to(device)
    return batch_ids


for epoch in range(1, epochs+1):
    print(f'epoch {epoch}/{epochs} : training')

    train_loss = []
    train_acc = []
    model.train()
    pbar = tqdm(range(n_train))
    for i in pbar:
        batch_ids = prep_tensors(train, i)

        model.zero_grad()
        loss, logits, _ = model(batch_ids,
                             token_type_ids=None, 
                            #  attention_mask=batch_mask,
                             labels=batch_ids
                             ).values()

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()
        scheduler.step()
        
        train_loss.append(loss.item())
        train_acc.append(accuracy(batch_ids, logits))
        pbar.set_description(f'acc {np.mean(train_acc):.4f} loss {np.mean(train_loss):.4f}', refresh=True)

    
    print(f'epoch {epoch}/{epochs} : validation')
    model.eval()
    val_acc = []
    val_loss = []
    pbar = tqdm(range(n_test))
    for i in pbar:
        batch_ids = prep_tensors(test, i)
        with torch.no_grad():        
            loss, logits, _ = model(batch_ids, 
                                token_type_ids=None, 
                                # attention_mask=batch_mask,
                                labels=batch_ids
                                 ).values()
        
        val_loss.append(loss.item())
        val_acc.append(accuracy(batch_ids, logits))
        pbar.set_description(f'acc {np.mean(val_acc):.4f} loss {np.mean(val_loss):.4f}', refresh=True)


In [None]:
PATH = 'model.pt'
torch.save(model.state_dict(), PATH)

In [None]:
def generate(prompt, len_gen=20, temperature=1):
    generated = tokenizer.encode(prompt)
    context = torch.tensor([generated]).to(device)
    past = None

    for i in tqdm(range(len_gen)):
        output, past = model(context, past_key_values=past).values()
        output = output / temperature
        token = torch.distributions.Categorical(logits=output[..., -1, :]).sample()
        
        generated += token.tolist()
        context = token.unsqueeze(0)

    sequence = tokenizer.decode(generated)

    return sequence

In [None]:
def text_generator(text):
    prompt = text
    prompt = tokenizer.encode(prompt, return_tensors='pt').to(device)
    out = model.generate(
        input_ids=prompt,
        max_length=70,
        num_beams=5,
        do_sample=True,
        temperature=3.,
        top_k=50,
        top_p=0.6,
        no_repeat_ngram_size=4,
        num_return_sequences=4,
        ).cpu().numpy()
    cut = textwrap.fill(tokenizer.decode(out[0]), 120).rfind('.') + 1
    return textwrap.fill(tokenizer.decode(out[0]), 120)[:cut]

In [None]:
# test text generation
print(text_generator('Китай'))

# Telegram bot on colab

In [None]:
!pip install python-dotenv
!pip install aiogram
!pip install nest_asyncio

In [None]:
import nest_asyncio
nest_asyncio.apply()

from pathlib import Path
import os

import numpy as np
from aiogram import Bot, types
from aiogram.dispatcher import Dispatcher
from aiogram.utils import executor
from aiogram.types import ContentType, File, Message
from bs4 import BeautifulSoup
from subprocess import call
import requests

from dotenv import load_dotenv

TOKEN = "your bot token" # bot token (use @BotFather to get it)

In [None]:
load_dotenv()

# Инициализация бота
bot = Bot(token=TOKEN)
dp = Dispatcher(bot)


@dp.message_handler(commands=['start'])
async def start(message: types.Message):
    user_name = message.from_user.full_name
    user_id = message.from_user.id
    await message.reply(f'Hello {user_name}! I generate sarcastic news, just start the phrase.')

@dp.message_handler()
async def echo(message: types.Message):
    text = message.text
    await message.reply(text_generator(text))

# Start bot
if __name__ == '__main__':
    executor.start_polling(dp)
