In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import string
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from IPython.display import clear_output
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import ssl

ssl._create_default_https_context = ssl._create_unverified_context
%config Completer.use_jedi = False

Нам понадобятся функции для того чтобы мониторить прогресс обучения нейронной сети.

In [None]:
def plot_progress(train_losses, train_accs, test_loss, test_accs):
    clear_output(True)
    
    f, (ax1, ax2) = plt.subplots(nrows=1, ncols=2)
    f.set_figheight(6)
    f.set_figwidth(12)
    
    ax1.plot(train_losses, label='train loss')
    ax1.plot(test_loss, label='test loss')
    ax1.plot(np.zeros_like(train_losses), '--', label='zero')
    ax1.set_title('Loss')
    ax1.set_ylabel('Loss')
    ax1.set_xlabel('Batch number')
    ax1.legend()
    
    ax2.plot(train_accs, label='train accuracy')
    ax2.plot(test_accs, label='test accuracy')
    ax2.plot(np.ones_like(accs), '--', label='100% accuracy')
    ax2.set_title('Accuracy')
    ax2.set_ylabel('Accuracy')
    ax2.set_xlabel('Batch number')
    ax2.legend()

    plt.show()
    
    
def get_test_accuraccy(model, test_sequence_dataloader):
    test_accs = []

    for x_test, y_test in test_sequence_dataloader:
        test_preds = model(x_test)
        test_acc = ((test_preds > 0.5).long() == y_test).float().numpy()
        test_accs.extend(list(test_acc))

    return np.mean(test_accs)
    

Этот семинар будет посвящен работе с последовательностями!

План семинара:

    1) Разгадываем головоломку: CNN
    2) Разгадываем головоломку: LSTM

Если успеем:
    
    1) Обучаем CharCNN для Suggest

In [None]:
data = pd.read_csv('https://stepik.org/media/attachments/lesson/537384/sequence_puzzle.csv')
data.head()

In [None]:
data.loc[0]['sequence'], data['class'].loc[0]

In [None]:
data.loc[1]['sequence'], data['class'].loc[1]

### Будем обучать побуквенную модель

Создадим словарь всех токенов:

In [None]:
tokens = set(''.join(data['sequence']))
n_tokens = len(tokens)
token_index_map = {l: i for i, l in enumerate(tokens)}

Далее нам нужно написать класс DataSet, прямо как домашнем семинаре)

In [None]:
class PuzzleDataset(Dataset):
    
    def __init__(self, lines, labels):
        self.lines = lines  # объекты
        self.labels = labels  # ответы
        
    def __len__(self):
        return len(self.lines)  # количество объектов
    
    def __getitem__(self, idx):
        x = self.line_to_tensor(self.lines[idx]).long()  # преобразуем один объект в тензор индексов, тип long()
        y = torch.tensor(self.labels[idx]).float()  # ответ на объекте, тип float()
        return x, y
    
    @staticmethod
    def line_to_tensor(line):
        return torch.tensor([token_index_map[l] for l in line])

Давайте посмотрим, как выглядит закодированный объект:

In [None]:
puzzle_dataset = PuzzleDataset(lines=data['sequence'].values, labels=data['class'].values)
puzzle_dataset[0]

Теперь посмотрим правильно ли закодировались числа, выведем первую букву, первой последовательности:

In [None]:
x, y = puzzle_dataset[0]
data['sequence'].values[0]

In [None]:
token_index_map['s']

In [None]:
x

Рассмотрим класс модели:

In [None]:
class PuzzleCNN(nn.Module):
    
    def __init__(self, vocab_size, hidden_size, kernel_size=1, embedding_dim=8):
        super(PuzzleCNN, self).__init__()
        self.hidden_size = hidden_size
        
        self.embedding = nn.Embedding(vocab_size + 1, embedding_dim)
        self.cnn = nn.Conv1d(
            in_channels=embedding_dim,
            out_channels=hidden_size,
            kernel_size=kernel_size,
            padding='same',
        )
        self.relu = nn.ReLU()
        
        self.linear = nn.Linear(hidden_size, 1)
        self.sigmoid = nn.Sigmoid()


    def forward(self, x):
        x = self.embedding(x)
        x = self.cnn(x.permute(0, 2, 1))
        x, _ = x.max(dim=-1)
        x = self.relu(x)
        x = self.linear(x)
        x = self.sigmoid(x).squeeze()
        return x

Разобъем данные на train_test.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    data.sequence, data['class'], test_size=0.05, 
)

train_dataset = PuzzleDataset(lines=X_train.values, labels=y_train.values)
test_dataset = PuzzleDataset(lines=X_test.values, labels=y_test.values)

train_sequence_dataloader = DataLoader(train_dataset, batch_size=512, shuffle=True)
test_sequence_dataloader = DataLoader(test_dataset, batch_size=512, shuffle=True)

In [None]:
one_objext_batch_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True)
x, y = next(iter(one_objext_batch_dataloader))
print(x.shape)
emb_layer = nn.Embedding(num_embeddings=len(token_index_map) + 1, embedding_dim=8,)
emb_output = emb_layer(x)
emb_output.shape

In [None]:
conv1d = nn.Conv1d(in_channels=8, out_channels=16, kernel_size=12, padding='same')

In [None]:
print(emb_output.permute(0, 2, 1).shape)
conv_output = conv1d(emb_output.permute(0, 2, 1))
conv_output.shape

In [None]:
max_output, _ = conv_output.max(dim=2)
max_output.shape

In [None]:
cnn = nn.Conv1d(in_channels=8, out_channels=16, kernel_size=3, padding='same')

In [None]:
cnn(emb_layer(x).permute(0, 2, 1)).shape

In [None]:
# nn layers here

Размерём каждый слой нейронной сети по отдельности



In [None]:
model = PuzzleCNN(
    vocab_size=len(token_index_map),
    hidden_size=32,
    kernel_size=30,
)

optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
loss_func = nn.BCELoss()

n_epochs = 5

losses = []
accs = []

test_losses = []
test_accs = []


for i in range(n_epochs):
    for x_train, y_train in train_sequence_dataloader:

        model.train()
        preds = model(x_train)
        train_loss = loss_func(preds, y_train)
        train_acc = ((preds > 0.5).long() == y_train).float().mean()
        
        optimizer.zero_grad()
        train_loss.backward()
        optimizer.step()
        model.eval()

        x_test, y_test = next(iter(test_sequence_dataloader))
        
        test_preds = model(x_test)
        test_loss = loss_func(test_preds, y_test)
        test_acc = ((test_preds > 0.5).long() == y_test).float().mean()
        
        losses.append(train_loss.item())
        accs.append(train_acc.item())
        
        test_losses.append(test_loss.item())
        test_accs.append(test_acc.item())
        
        plot_progress(losses, accs, test_losses, test_accs)

In [None]:
get_test_accuraccy(model, test_sequence_dataloader)

**Ближайший аналог из практики – vin2param**

Если вы думаете, что задача слишком игрушечная, то ошибаетесь! В Авито есть очень похожая задача – определение параметров автомобиля по VIN номеру:

---

<img src=https://ucarecdn.com/015d7ece-a2cf-415e-990d-70a19e8a7324/ style="width: 600px;">

### Задание на семинаре.

    1) Напишете класс PuzzleLSTM – рекуррентную модель решающую задачу бинарной классификации
        a) Что выдаёт на выход слой LSTM из PyTorch?
        б) Какой слой вам нужен LSTM или LSTMCell?
    2) Обучите модель решать головоломку используя training loop для CNN

In [None]:
class PuzzleLSTM(nn.Module):
    
    def __init__(self,):
        super(PuzzleLSTM, self).__init__()


    def forward(self, x):
        pass

In [None]:
### trainig loop here

### Доп задание для тех кто всё успел:
    1) Обучите пословную CNN для задачи suggest;
    2) Обучите побуквенную CNN для задачи suggest;
    3) Добавьте ещё один слой нейронной сети;
    4) Попробуйте соединить 2 модели в одну.