In [None]:
from collections import Counter
from torch.utils.data import Dataset, DataLoader
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from pathlib import Path
from nltk.corpus import stopwords
import string
import json
import re
import pandas as pd, numpy as np
import matplotlib.pyplot as plt
import os, urllib, itertools, shutil, random
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
import PIL
import torch
from pathlib import Path

In [None]:
!unzip Poetry.zip -d myfiles

Archive:  Poetry.zip
   creating: myfiles/Poetry/
  inflating: myfiles/Poetry/1.txt    
  inflating: myfiles/Poetry/10.txt   
  inflating: myfiles/Poetry/11.txt   
  inflating: myfiles/Poetry/12.txt   
  inflating: myfiles/Poetry/19th_oct.txt  
  inflating: myfiles/Poetry/22.txt   
  inflating: myfiles/Poetry/23.txt   
  inflating: myfiles/Poetry/25.txt   
  inflating: myfiles/Poetry/3.txt    
  inflating: myfiles/Poetry/33.txt   
  inflating: myfiles/Poetry/4.txt    
  inflating: myfiles/Poetry/5.txt    
  inflating: myfiles/Poetry/6.txt    
  inflating: myfiles/Poetry/7.txt    
  inflating: myfiles/Poetry/8.txt    
  inflating: myfiles/Poetry/9.txt    
  inflating: myfiles/Poetry/about_a_fish.txt  
  inflating: myfiles/Poetry/about_oleg.txt  
  inflating: myfiles/Poetry/alex1.txt  
  inflating: myfiles/Poetry/anchar.txt  
  inflating: myfiles/Poetry/aughtam.txt  
  inflating: myfiles/Poetry/beglets.txt  
  inflating: myfiles/Poetry/blue_mount_kavkaz.txt  
  inflating: myfiles/Poetry/

In [None]:
# !unzip poems.csv.zip -d myfiles

Archive:  poems.csv.zip
replace myfiles/poems.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: myfiles/poems.csv       


In [None]:
mydf = pd.read_csv("/content/myfiles/poems.csv")

In [None]:
TRAIN_DIR = Path('/content/myfiles/Poetry/')
files_list = list(TRAIN_DIR.rglob('*.txt'))

In [None]:
class PoetryDataset(Dataset):
  def __init__(self, files, mode):
    super().__init__()
    self.files = files
    self.mode = mode

    if not os.path.exists("word_to_index.json"):
      self.create_word_to_index_dict()
    with open('word_to_index.json', 'r') as f:
      loaded_dict = json.load(f)
      self.word_to_index_dict = loaded_dict


  def __len__(self):
    return len(self.files)

  def loadtxt(self, file):
    with open(file, 'r', encoding='utf-8') as text:
      content = text.read()
    return content


  def preprocess_for_w2vec(self, content):
      lines = content.splitlines()
      tokens = []
      for line in lines:
          line_tokens = re.findall(r'.|\s', line)
          tokens.extend(line_tokens)
          tokens.append("EOL")
      return tokens

  def create_word_to_index_dict(self):

    word_counter = {}


    for file in self.files:
        content = self.loadtxt(file)
        tokens = self.preprocess_for_w2vec(content)
        for token in tokens:
            if token not in word_counter:
                word_counter[token] = 1
            else:
                word_counter[token] += 1


    word_to_index = {word: idx for idx, (word, _) in enumerate(word_counter.items())}
    word_to_index["PAD"] = len(word_to_index)
    with open('word_to_index.json', 'w') as f:
       json.dump(word_to_index, f)

    return word_to_index


  def __getitem__(self, idx):
      fileee = self.loadtxt(self.files[idx])
      sequence_of_tokens = self.preprocess_for_w2vec(fileee)

      inputs = sequence_of_tokens[:-1]
      targets = sequence_of_tokens[1:]


      sequence_of_indices_input = [self.word_to_index_dict.get(token) for token in inputs]
      sequence_of_indices_target = [self.word_to_index_dict.get(target) for target in targets]

      return {"inputs": torch.tensor(sequence_of_indices_input),
            "targets": torch.tensor(sequence_of_indices_target)}

In [None]:
train_val_files = list(TRAIN_DIR.rglob('*.txt'))
train_dataset = PoetryDataset(train_val_files, mode='train')

In [None]:
with open('word_to_index.json', 'r') as f:
    loaded_dict = json.load(f)

In [None]:
index_to_word = {value: key for key, value in loaded_dict.items()}

In [None]:
len(loaded_dict)

118

In [None]:
def evaluate(model, char_to_idx, idx_to_char, start_text=' ', prediction_len=200, temp=0.3):
    hidden = model.init_hidden()
    idx_input = [char_to_idx[char] for char in start_text]
    train = torch.LongTensor(idx_input).view(-1, 1, 1).to(device)
    predicted_text = start_text

    _, hidden = model(train, hidden)

    inp = train[-1].view(-1, 1, 1)

    for i in range(prediction_len):
        output, hidden = model(inp.to(device), hidden)
        output_logits = output.cpu().data.view(-1)
        p_next = F.softmax(output_logits / temp, dim=-1).detach().cpu().data.numpy()
        top_index = np.random.choice(len(char_to_idx), p=p_next)
        inp = torch.LongTensor([top_index]).view(-1, 1, 1).to(device)
        predicted_char = idx_to_char[top_index]
        if predicted_char == 'EOL':  # Проверка на EOL
            predicted_text += '\n'
        else:
            predicted_text += predicted_char

    return predicted_text

In [None]:
class PoetryLSTM(nn.Module):
  def __init__(self, input_size, hidden_size, embedding_size, n_layers):
    super(PoetryLSTM, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.embedding_size = embedding_size
    self.n_layers = n_layers


    self.embedding = nn.Embedding(self.input_size, self.embedding_size)

    self.lstm = nn.LSTM(input_size=self.embedding_size,
                            hidden_size=self.hidden_size,
                            num_layers= self.n_layers)
    self.dropout = nn.Dropout(0.2)

    self.fc_out = nn.Linear(self.hidden_size, self.input_size)

  def forward(self, x, hidden):
      embedded_x = self.embedding(x).squeeze(2)
      lstm_out, (ht0, ct0) = self.lstm(embedded_x, hidden)
      lstm_out = self.dropout(lstm_out)
      linear_layer_out_before_softmax = self.fc_out(lstm_out)

      return linear_layer_out_before_softmax, (ht0, ct0)

  def init_hidden(self, batch_size=1):
      return (torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device),
              torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device))

In [None]:
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence

In [None]:
def custom_collate(batch):
  max_seq_len = 1000
  inputs_batch_list = [item["inputs"] for item in batch]
  targets_batch_list = [item["targets"] for item in batch]
  padded_inputs = pad_sequence(inputs_batch_list, batch_first = True, padding_value = loaded_dict["PAD"])
  padded_targets = pad_sequence(targets_batch_list, batch_first = True, padding_value = loaded_dict["PAD"])
  mask = (padded_targets != loaded_dict["PAD"]).float()
  return {"inputs": padded_inputs[:, :max_seq_len].unsqueeze(-1),
          "targets": padded_targets[:, :max_seq_len].unsqueeze(-1),
          "mask": mask}

In [None]:
batch_size = 12

In [None]:
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle = True, collate_fn = custom_collate)

In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = PoetryLSTM(input_size=len(index_to_word), hidden_size=128, embedding_size=128, n_layers=2)
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    patience=5,
    verbose=True,
    factor=0.5
)

n_epochs = 2000
loss_avg = []



In [None]:
for epoch in range(n_epochs):
    print(epoch)
    for batch in train_loader:
      model.train()
      train, target = batch["inputs"], batch["targets"]



      train = train.permute(1, 0, 2).to(device)
      target = target.permute(1, 0, 2).to(device)

      train = train.contiguous()
      target = target.contiguous()

      batch_size = train.size(1)

      hidden = model.init_hidden(batch_size)



      output, hidden = model(train, hidden)
      loss = criterion(output.permute(1, 2, 0), target.squeeze(-1).permute(1, 0))

      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

      loss_avg.append(loss.item())
      if len(loss_avg) >= 50:
          mean_loss = np.mean(loss_avg)
          print(f'Loss: {mean_loss}')
          scheduler.step(mean_loss)
          loss_avg = []
          model.eval()
          predicted_text = evaluate(model, loaded_dict, index_to_word)
          print(predicted_text)


[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
174
Loss: 0.5429088598489762
 творенья —
И долго слава — от сердце глуши трудно.
И вымолвить хочет на свете
Ты царевны молодой?
Я жених ее». — «Постой, ни родит
Не отдаленный путь сказать свой мучии
Мечло молво и постигнуть не воз
175
176
177
178
179
Loss: 0.5401284652948379
 анге сердительный полны,
И над порока меч вести,
До прочив тоской забавы,
Как сило пользы видит, ни народ,
И главы не подобный лесов
Прекрасно повудного царяй;
Вот вышет безумных рек.
Поверьтенья не с
180
181
182
183
184
Loss: 0.541507169008255
 улабою простится,
Повод ним кого не столбой
Он давно пришел, когда вас больше не одна.
Не вижу меня верною ветви лаская;
С тени скалодный перед напрями мечтам.
Они польность сладостется в сердца таин…
185
186
187
188
189
Loss: 0.5289301067590714
 анке волны,
Так мучине пропал, пылая обман…
Шуми, шуми по небу
Стобою далекой прости!
Согреты под собой в тихо мне,
И вижу мило прибрасной любовью,


In [None]:
model.eval()

print(evaluate(
    model,
    loaded_dict,
    index_to_word,
    temp=0.3,
    prediction_len=250,
    start_text='дышал как нелюдь  '
    )
)

дышал как нелюдь  Мостом
Трель, коня неземных усенья но не цвету она
В тени так зенит воздеться конь,
Но верными руком.
Всё пускава тений обо мгла
На поля поэта лит; не ветвя,
Что стол бога страстью светлый зашар,
Когда бы ние всю славу
Мы стол больною лебежден.
В тел
