In [1]:
import torch
from torch import nn
import re
import random
import tqdm
import time

In [2]:
with open('data/nietzsche.txt', encoding='utf-8') as f:
    text = f.read().lower()
print('length:', len(text))
text = re.sub('[^a-z ]', ' ', text)
text = re.sub(r'\s+', ' ', text)

length: 600893


In [10]:
text[:100]

'preface supposing that truth is a woman what then is there not ground for suspecting that all philos'

Составим алфавит кодирофки символов 

In [13]:
INDEX_TO_CHAR = sorted(list(set(text)))
CHAR_TO_INDEX = {c: i for i, c in enumerate(INDEX_TO_CHAR)}

In [14]:
CHAR_TO_INDEX

{' ': 0,
 'a': 1,
 'b': 2,
 'c': 3,
 'd': 4,
 'e': 5,
 'f': 6,
 'g': 7,
 'h': 8,
 'i': 9,
 'j': 10,
 'k': 11,
 'l': 12,
 'm': 13,
 'n': 14,
 'o': 15,
 'p': 16,
 'q': 17,
 'r': 18,
 's': 19,
 't': 20,
 'u': 21,
 'v': 22,
 'w': 23,
 'x': 24,
 'y': 25,
 'z': 26}

In [15]:
MAX_LEN = 40
STEP = 3
SENTENCES = []
NEXT_CHARS = []
for i in range(0, len(text) - MAX_LEN, STEP):
    SENTENCES.append(text[i: i + MAX_LEN])
    NEXT_CHARS.append(text[i + MAX_LEN])
print('Num sents:', len(SENTENCES))

Num sents: 193075


In [16]:
print('Vectorization...')
X = torch.zeros((len(SENTENCES), MAX_LEN), dtype=int)
Y = torch.zeros((len(SENTENCES)), dtype=int)
for i, sentence in enumerate(SENTENCES):
    for t, char in enumerate(sentence):
        X[i, t] = CHAR_TO_INDEX[char]
    Y[i] = CHAR_TO_INDEX[NEXT_CHARS[i]]

Vectorization...


In [17]:
X[0:1], Y[0]

(tensor([[16, 18,  5,  6,  1,  3,  5,  0, 19, 21, 16, 16, 15, 19,  9, 14,  7,  0,
          20,  8,  1, 20,  0, 20, 18, 21, 20,  8,  0,  9, 19,  0,  1,  0, 23, 15,
          13,  1, 14,  0]]),
 tensor(23))

In [18]:
BATCH_SIZE= 256
dataset = torch.utils.data.TensorDataset(X, Y)
data = torch.utils.data.DataLoader(dataset, BATCH_SIZE, shuffle=True)

In [19]:
class NeuralNetwork(nn.Module):
    def __init__(self, rnnClass, dictionary_size, embedding_size, num_hiddens, num_classes):
        super().__init__()
        
        self.num_hiddens = num_hiddens
        self.embedding = nn.Embedding(dictionary_size, embedding_size)
        self.hidden = rnnClass(embedding_size, num_hiddens, batch_first=True)
        self.output = nn.Linear(num_hiddens, num_classes)
        
    def forward(self, X):
        out = self.embedding(X)
        _, state = self.hidden(out)
        predictions = self.output(state[0].squeeze())
        return predictions

In [20]:
model = NeuralNetwork(nn.LSTM, len(CHAR_TO_INDEX), 64, 128, len(CHAR_TO_INDEX))

In [23]:
def sample(preds):
    softmaxed = torch.softmax(preds, 0)
    probas = torch.distributions.multinomial.Multinomial(1, softmaxed).sample()
    return probas.argmax()

def generate_text():
    start_index = random.randint(0, len(text) - MAX_LEN - 1)

    generated = ''
    sentence = text[start_index: start_index + MAX_LEN]
    generated += sentence


    for i in range(MAX_LEN):
        x_pred = torch.zeros((1, MAX_LEN), dtype=int)
        for t, char in enumerate(generated[-MAX_LEN:]):
            x_pred[0, t] = CHAR_TO_INDEX[char]

        preds = model(x_pred).cpu()
        next_char = INDEX_TO_CHAR[sample(preds)]
        generated = generated + next_char
    print(generated[:MAX_LEN] + '|' + generated[MAX_LEN:])

generate_text()

ur through the many finer and coarser mo|wrxbbefntpyphzojgvvol ufizvizzkdosbvxbjt


In [25]:
model = NeuralNetwork(nn.LSTM, len(CHAR_TO_INDEX), 64, 128, len(CHAR_TO_INDEX))
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

In [28]:
model

NeuralNetwork(
  (embedding): Embedding(27, 64)
  (hidden): LSTM(64, 128, batch_first=True)
  (output): Linear(in_features=128, out_features=27, bias=True)
)

In [30]:
for ep in range(100):
    start = time.time()
    train_loss = 0.
    train_passed = 0

    model.train()
    for X_b, y_b in data:
        X_b, y_b = X_b, y_b
        optimizer.zero_grad()
        answers = model(X_b)
        loss = criterion(answers, y_b)
        train_loss += loss.item()

        loss.backward()
        optimizer.step()
        train_passed += 1

    print("Epoch {}. Time: {:.3f}, Train loss: {:.3f}".format(ep, time.time() - start, train_loss / train_passed))
    model.eval()
    generate_text()

Epoch 0. Time: 12.462, Train loss: 1.194
ss of its advocates than by anything els|e and in the same thes are comes s maste
Epoch 1. Time: 11.521, Train loss: 1.186
nd peoples must not be estimated by our |faith and judgmes seals understand speci
Epoch 2. Time: 11.435, Train loss: 1.177
d and evil spirits wage war with varying| permacters he is a hone of lacking it a
Epoch 3. Time: 12.464, Train loss: 1.171
t this contest should always be kept up |so germans however among to be forgages 
Epoch 4. Time: 11.487, Train loss: 1.165
 who cut ruthlessly into his own flesh a|nd because since super who dour at als t
Epoch 5. Time: 11.463, Train loss: 1.157
they will not only have a smile but a ge|neral equalitues motelour stand is a rul
Epoch 6. Time: 11.369, Train loss: 1.151
en a sheep for a hero is it so extraordi|nary compusies houses to higher more fre
Epoch 7. Time: 11.461, Train loss: 1.145
urally suffer in all their scientific ti|mes have though stand within they have n
Epoch 8. Time: 1

 # Формирование предсказания y  по послежовательности x

 Сначала генерируем случайные последовательности по x и по ним сформируем по определенному алгоритцу значение у И попробуем построить модель  

 Длина последовательности -5
 

In [31]:
COUNT_TRAIN = 1000
COUNT_TEST = 100
ARRAY_SIZE = 5
def generation_data(len_data:int):
    cifre = list(range(0, 9))
    X, y = [], []
    for item in range(len_data):
        x_item = random.sample(list(range(0, 9)), ARRAY_SIZE)

        X.append(x_item)
        y_add = []
        for i,  x_ in enumerate(x_item):
            if i == 0:
                y_add.append(x_)
                continue

            y_ = x_ + x_item[0]

            if y_ >= 10:
                y_ -= 10
            y_add.append(y_)
        y.append(y_add)
    return torch.tensor(X), torch.tensor(y)

X_train, y_train = generation_data(COUNT_TRAIN)
X_test, y_test = generation_data(COUNT_TEST)

In [32]:
X_train[0]


tensor([4, 2, 6, 8, 1])

In [33]:
y_train[0]

tensor([4, 6, 0, 2, 5])

In [35]:
class NeuralNetwork(nn.Module):
    def __init__(self, dictionary_size, embedding_size, num_hiddens):
        super().__init__()

        self.embedding = nn.Embedding(dictionary_size, embedding_size)
        self.hidden = nn.LSTM(embedding_size, num_hiddens, batch_first=True)
        self.output = nn.Linear(num_hiddens, 1)

    def forward(self, X):
        out = self.embedding(X)

        x, _ = self.hidden(out)
        predictions = self.output(x)
        return predictions

In [36]:
import numpy as np
import torch.optim as optim
import torch.utils.data as data

loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=8)

model = NeuralNetwork(10, 30, 54  )
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()

In [37]:
for epoch in range(500):
    model.train()
    for X_batch, y_batch in loader:
        y_pred = model(X_batch)

        y_batch = y_batch.flatten().to(dtype=torch.float32)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)

        loss = loss_fn(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Validation
    if epoch % 50 != 0:
        continue
    model.eval()

    with torch.no_grad():
        y_pred = model(X_train)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)
        y_train_f = y_train.flatten().to(dtype=torch.float32)
        train_rmse = np.sqrt(loss_fn(y_pred, y_train_f))

        y_pred = model(X_test)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)
        y_test_f = y_test.flatten().to(dtype=torch.float32)
        test_rmse = np.sqrt(loss_fn(y_pred, y_test_f))
    print("Epoch %d: train RMSE %.4f, test RMSE %.4f" % (epoch, train_rmse, test_rmse))

Epoch 0: train RMSE 3.0059, test RMSE 2.9829
Epoch 50: train RMSE 0.0624, test RMSE 0.1013
Epoch 100: train RMSE 0.0323, test RMSE 0.0552
Epoch 150: train RMSE 0.0509, test RMSE 0.0634
Epoch 200: train RMSE 0.0243, test RMSE 0.0404
Epoch 250: train RMSE 0.0159, test RMSE 0.0379
Epoch 300: train RMSE 0.0134, test RMSE 0.0323
Epoch 350: train RMSE 0.0171, test RMSE 0.0303
Epoch 400: train RMSE 0.0104, test RMSE 0.0278
Epoch 450: train RMSE 0.0117, test RMSE 0.0268


In [47]:
X_test[10]

tensor([0, 4, 1, 8, 3])

In [48]:
y_test[10]

tensor([0, 4, 1, 8, 3])

In [49]:
y_pre = model(X_test[10])
y_pre = y_pre.squeeze().flatten().tolist()
y_pre = [ round(item,0) for item in y_pre]
y_pre

[-0.0, 4.0, 1.0, 8.0, 3.0]

### Вывод модель строена  - она достаточно точно предсказывает поведение y
Попробуем другую модель - без  слоя Embedding

In [69]:
class ModelLST(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=2, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.to(dtype=torch.float32)
        x, _ = self.lstm(x)
        x = self.linear(x)
        return x

In [70]:
model = ModelLST(5, 50, 5  )
optimizer = optim.Adam(model.parameters())
loss_fn = nn.MSELoss()
loader = data.DataLoader(data.TensorDataset(X_train, y_train), shuffle=True, batch_size=8)

In [71]:
for epoch in range(500):
    model.train()
    for X_batch, y_batch in loader:
        y_pred = model(X_batch.to(dtype=torch.float32))

        y_batch = y_batch.flatten().to(dtype=torch.float32)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)

        loss = loss_fn(y_pred, y_batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # Validation
    if epoch % 50 != 0:
        continue
    model.eval()

    with torch.no_grad():
        y_pred = model(X_train)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)
        y_train_f = y_train.flatten().to(dtype=torch.float32)
        train_rmse = np.sqrt(loss_fn(y_pred, y_train_f))

        y_pred = model(X_test)
        y_pred = y_pred.squeeze().flatten().to(dtype=torch.float32)
        y_test_f = y_test.flatten().to(dtype=torch.float32)
        test_rmse = np.sqrt(loss_fn(y_pred, y_test_f))
    print("Epoch %d: train RMSE %.4f, test RMSE %.4f" % (epoch, train_rmse, test_rmse))

Epoch 0: train RMSE 2.8372, test RMSE 2.8621
Epoch 50: train RMSE 1.5798, test RMSE 1.6401
Epoch 100: train RMSE 0.7683, test RMSE 0.7794
Epoch 150: train RMSE 0.3582, test RMSE 0.3642
Epoch 200: train RMSE 0.1879, test RMSE 0.2131
Epoch 250: train RMSE 0.1447, test RMSE 0.1517
Epoch 300: train RMSE 0.2109, test RMSE 0.2109
Epoch 350: train RMSE 0.1004, test RMSE 0.1166
Epoch 400: train RMSE 0.1240, test RMSE 0.1303
Epoch 450: train RMSE 0.1127, test RMSE 0.1262


In [72]:
X_test[0:2]

tensor([[6, 1, 8, 2, 7],
        [5, 8, 7, 2, 6]])

In [73]:
y_test[0:2].flatten().tolist()

[6, 7, 4, 8, 3, 5, 3, 2, 7, 1]

In [76]:
y_pre = model(X_test[0:2])
y_pre = y_pre.squeeze().flatten().tolist()
y_pre = [ int(item) for item in y_pre]
y_pre

[6, 7, 3, 7, 3, 4, 3, 2, 6, 0]

# Выводы 
- применение LSTM для решения лекционного практического задания
- построена модели для предсказания значения y - слой  Embedding нужен ТОчность значительно возрастает! 