# Нейронные сети на базе PyTorch

In [1]:
import matplotlib.pyplot as plt

import torch
import torchinfo

import timm

from tqdm.auto import tqdm

ModuleNotFoundError: No module named 'torch'

In [None]:
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor

Мы воспользуемся датасетом MNIST с рукописными цифрами из комплекта torchvision:

In [None]:
dataset = MNIST('mnist-data', download=True)

In [None]:
dataset[0][0]

Изначально он содежит картинки в формате PIL, но мы воспользуемся встроенным преобразованием, чтобы получать сразу массивы.

In [None]:
dataset = MNIST('mnist-data', download=True, transform=ToTensor())

In [None]:
dataset[0][1]

## Тензоры и автоматическое дифференцирование

Попробуем распознавать символы линейно (то есть kX+b).

На входе будет картинка 28х28, а на выходе 10 классов. Поскольку наш алгоритм линейный, картинку придется сплющить в один вектор длиной 28х28=784. Получается матрица весов 784х10. Заполним ее случайными значениями.

In [None]:
weights = torch.randn(28 * 28, 10) / 28

Это так называемая инициализация Ксавье: случайные числа из стандартного нормального распределения, поделенные на корень из размера входных данных.

Важное свойство тензоров torch - возможность автоматического подсчета градиента. Как только мы объявим:

In [None]:
weights.requires_grad_()

все действия над тензором будут учитываться для будущего расчета градиента.

Не забудем задать и вектор смещения (его можно инициализировать нулями).

In [None]:
bias = torch.zeros(10, requires_grad=True)

Для бинарной классификации мы пропускали результаты линейной регрессии через логистическую функцию. В случае мультикласса нам понадобится ее обобщение - **softmax**

$$
Softmax(x_i) = \frac{e^{x_i}}{\sum_j {e^{x_j}}}
$$

In [None]:
def model(x):
    return torch.softmax(x @ weights + bias, dim=-1)

Стоит сразу подумать и о потреблении ресурсов. Как правило, при обучении нейросетей пользуются стохастическим градиентным спуском - обучают небольшими порциями (batch):

In [None]:
batch_size = 64
xb = torch.stack([dataset[i][0].flatten() for i in range(batch_size)])

In [None]:
xb.shape

Теперь, когда мы передадим модели этот пакет данных, мы получим на выходе матрицу 64х10 - вероятности класса для каждого элемента в батче:

In [None]:
output = model(xb)
output.shape

В случае мультикласса порог не так важен: где вероятность больше, тот класс и предсказываем:

In [None]:
preds = torch.argmax(output, dim=1)

In [None]:
preds

Можно сразу оценить и метрику accuracy (она, естественно, будет околонулевая, поскольку мы еще ничего не учили).

In [None]:
y_true = torch.as_tensor([dataset[i][1] for i in range(batch_size)])

In [None]:
y_true

In [None]:
(preds == y_true).float().mean()

### А как учиться?

Вспоминаем, что нам надо минимизировать функцию потерь. В случае классификации это logloss. В torch она реализована как `torch.nn.NLLLoss()`.

In [None]:
loss_func = torch.nn.NLLLoss()
loss = loss_func(output, y_true)

Вот тут в дело и вступает автоматическое дифференцирование! Для этого нам нужно сделать

In [None]:
loss.backward()

А затем остается соответственно сдвинуть веса:

In [None]:
learning_rate = 0.1

In [None]:
with torch.no_grad():
    weights -= weights.grad * learning_rate
    bias -= bias.grad * learning_rate
    weights.grad.zero_()
    bias.grad.zero_()

В конце использованный градиент мы сбрасывем и начинаем следующую итерацию.

In [None]:
bias

Оформляем обучение в виде цикла:

In [None]:
for epoch in tqdm(range(10)):
    for i in range(len(dataset) // batch_size):
        start = i * batch_size
        end = start + batch_size
        xb = torch.stack([dataset[i][0].flatten() for i in range(start, end)])
        yb = y_true = torch.as_tensor([dataset[i][1] for i in range(start, end)])
        pred = model(xb)
        loss = loss_func(pred, yb)

        loss.backward()
        with torch.no_grad():
            weights -= weights.grad * learning_rate
            bias -= bias.grad * learning_rate
            weights.grad.zero_()
            bias.grad.zero_()

In [None]:
print(f'Accuracy на последнем батче: {(pred.argmax(axis=1) == yb).float().mean().item():.2%}')

## Удобства torch

В реальных задачах нам не нужно вручную задавать тензоры и перемножать их. Torch содержит реализации всевозможных слоев, нам нужно только написать класс, наследующий `torch.nn.Module` и реализовать в нем метод `forward()`.

In [None]:
import torch.nn as nn

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.lin = nn.Linear(784, 10)

    def forward(self, xb):
        xb = self.flatten(xb)
        return self.lin(xb)

Не обязательно и делать руками логистику на выходе: есть функция потерь `CrossEntropyLoss()`, применяющая ее автоматически:

In [None]:
model = MyModel()
loss_func = nn.CrossEntropyLoss()

In [None]:
for epoch in tqdm(range(10)):
    for i in range(len(dataset) // batch_size):
        start = i * batch_size
        end = start + batch_size
        xb = torch.stack([dataset[i][0] for i in range(start, end)])
        yb = y_true = torch.as_tensor([dataset[i][1] for i in range(start, end)])
        
        pred = model(xb)
        loss = loss_func(pred, yb)

        loss.backward()
        with torch.no_grad():
            for p in model.parameters():
                p -= p.grad * learning_rate
            model.zero_grad()
print(f'Accuracy на последнем батче: {(pred.argmax(axis=1) == yb).float().mean().item():.2%}')

### Оптимизаторы

In [None]:
from torch.optim import SGD, Adam

In [None]:
model = MyModel()
loss_func = nn.CrossEntropyLoss()

optimizer = SGD(model.parameters(), lr=0.1)

In [None]:
def train(model, optimizer, loss_func):
    for epoch in tqdm(range(10)):
        for i in range(len(dataset) // batch_size):
            start = i * batch_size
            end = start + batch_size
            xb = torch.stack([dataset[i][0] for i in range(start, end)])
            yb = y_true = torch.as_tensor([dataset[i][1] for i in range(start, end)])
            
            pred = model(xb)
            loss = loss_func(pred, yb)
    
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
    print(f'Accuracy на последнем батче: {(pred.argmax(axis=1) == yb).float().mean().item():.2%}')

In [None]:
train(model, optimizer, loss_func)

In [None]:
optimizer = Adam(model.parameters(), lr=3e-4)

train(model, optimizer, loss_func)

### Dataloader

In [None]:
from torch.utils.data import DataLoader

In [None]:
train_dl = DataLoader(dataset, batch_size=64)

In [None]:
def train(model, optimizer, loss_func):
    for epoch in tqdm(range(10)):
        for xb, yb in train_dl:
            pred = model(xb)
            loss = loss_func(pred, yb)
    
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
    print(f'Accuracy на последнем батче: {(pred.argmax(axis=1) == yb).float().mean().item():.2%}')

In [None]:
model = MyModel()
optimizer = Adam(model.parameters(), lr=3e-4)

train(model, optimizer, loss_func)

## Добавляем валидацию

In [None]:
from torch.utils.data import random_split

In [None]:
generator = torch.Generator().manual_seed(177013)

In [None]:
train_set, test_set = random_split(dataset, [0.8, 0.2], generator=generator)

In [None]:
train_dl = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=4)
test_dl = DataLoader(test_set, batch_size=64, shuffle=False, num_workers=4)

In [None]:
model = MyModel()
optimizer = Adam(model.parameters(), lr=3e-4)

In [None]:
def train(model, optimizer, loss_func, num_epochs=10):
    for epoch in tqdm(range(num_epochs)):
        for xb, yb in train_dl:
            pred = model(xb)
            loss = loss_func(pred, yb)
    
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

In [None]:
def test(model):
    accuracies = []
    with torch.no_grad():
        for xb, yb in test_dl:
            pred = model(xb)
            accuracies.append((pred.argmax(axis=1) == yb).float().mean().item())
    print(f'Accuracy на валидации: {torch.mean(torch.as_tensor(accuracies)):.2%}')

In [None]:
train(model, optimizer, loss_func)

In [None]:
test(model)

## Полносвязная сеть с несколькими слоями

Еще один вариант задать сложную сеть - с помощью контейнера `torch.nn.Sequential()`:

In [None]:
model = nn.Sequential(
        nn.Flatten(),
        nn.Linear(784, 128),
        nn.ReLU(),
        nn.Linear(128, 128),
        nn.ReLU(),
        nn.Linear(128, 10),
    )

In [None]:
optimizer = Adam(model.parameters(), lr=3e-4)

In [None]:
train(model, optimizer, loss_func)

In [None]:
test(model)

## Свертка

In [None]:
import numpy as np
from scipy.signal import convolve2d
from PIL import Image

In [None]:
pic = Image.open('photo_2023-01-23_21-07-07.jpg')

In [None]:
pic = pic.resize((256,320))

In [None]:
pic

In [None]:
data = np.asarray(pic)

In [None]:
kernels = [ 
             [[0, 0, 0],  
              [0, 1, 0],
              [0, 0, 0]],
    
             [[0, -1, 0],
              [-1, 5, -1],
              [0, -1, 0]],
    
             [[0, 0, 0],
              [1, -2, 1],
              [0, 0, 0]],
    
             [[0, 1, 0],
              [0, -2, 0],
              [0, 1, 0]],
    
             [[0, 1, 0],
              [1, -4, 1],
              [0, 1, 0]],
    
             [[-1, 0, 1],
              [-2, 0, 2],
              [-1, 0, 1]],
    
            [[-1, -2, -1],
             [0,   0,  0],
             [1,   2,  1]]
]

In [None]:
fig, axes = plt.subplots(2,3, figsize=(10, 10))
for kernel, ax in zip(kernels, axes.flat):
    result = np.array([convolve2d(data[:,:,i], kernel, mode='same') for i in range(3)]).transpose((1,2,0))
    ax.imshow(result, vmin=0, vmax=255);

### Модель классификации на базе сверток

In [None]:
model = nn.Sequential(
    nn.Conv2d(1, 6, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(6, 16, kernel_size=3, padding=1),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Flatten(),
    nn.Linear(784, 120),
    nn.ReLU(),
    nn.Linear(120, 84),
    nn.ReLU(),
    nn.Linear(84, 10),
)


In [None]:
torchinfo.summary(model)

In [None]:
optimizer = Adam(model.parameters(), lr=3e-4)

In [None]:
train(model, optimizer, loss_func)

In [None]:
test(model)

## Transfer learning

In [None]:
from torchvision.models import resnet18

In [None]:
model = resnet18(weights='IMAGENET1K_V1')

In [None]:
torchinfo.summary(model)

### Меняем слои вручную

In [None]:
model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

In [None]:
model.fc = nn.Linear(model.fc.in_features, 10)

In [None]:
test(model)

In [None]:
loss_func = nn.CrossEntropyLoss()

In [None]:
train(model, optimizer, loss_func, num_epochs=1)

In [None]:
test(model)

In [None]:
def train(model, loss_func, num_epochs=10, device='cuda'):
    model = model.to(device)
    optimizer = Adam(model.parameters(), lr=3e-4)
    model.train()
    for epoch in tqdm(range(num_epochs)):
        for xb, yb in train_dl:
            pred = model(xb.to(device))
            loss = loss_func(pred.cpu(), yb)
    
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()

def test(model, device='cuda'):
    model.eval()
    accuracies = []
    with torch.no_grad():
        for xb, yb in test_dl:
            pred = model(xb.to(device))
            accuracies.append((pred.cpu().argmax(axis=1) == yb).float().mean().item())
    print(f'Accuracy на валидации: {torch.mean(torch.as_tensor(accuracies)):.2%}')

In [None]:
train(model, loss_func)

In [None]:
test(model)

### Transfer learning через timm

In [None]:
import timm

In [None]:
model = timm.create_model('eca_nfnet_l1', pretrained=True, num_classes=10, in_chans=1)

In [None]:
train(model, loss_func, num_epochs=1)

In [None]:
test(model)

## Автоэнкодеры

In [None]:
import os
import skimage
import pandas as pd
from torch.utils.data import TensorDataset

In [None]:
def fetch_dataset(attrs_name = "lfw_attributes.txt",
                      images_name = "lfw-deepfunneled",
                      dx=80,dy=80,
                      dimx=64,dimy=64
    ):

    #download if not exists
    if not os.path.exists(images_name):
        print("images not found, donwloading...")
        os.system("wget http://vis-www.cs.umass.edu/lfw/lfw-deepfunneled.tgz -O tmp.tgz")
        print("extracting...")
        os.system("tar xvzf tmp.tgz && rm tmp.tgz")
        print("done")
        assert os.path.exists(images_name)

    if not os.path.exists(attrs_name):
        print("attributes not found, downloading...")
        os.system("wget http://www.cs.columbia.edu/CAVE/databases/pubfig/download/%s" % attrs_name)
        print("done")

    #read attrs
    df_attrs = pd.read_csv("lfw_attributes.txt",sep='\t',skiprows=1,) 
    df_attrs = pd.DataFrame(df_attrs.iloc[:,:-1].values, columns = df_attrs.columns[1:])


    #read photos
    photo_ids = []
    for dirpath, dirnames, filenames in os.walk(images_name):
        for fname in filenames:
            if fname.endswith(".jpg"):
                fpath = os.path.join(dirpath,fname)
                photo_id = fname[:-4].replace('_',' ').split()
                person_id = ' '.join(photo_id[:-1])
                photo_number = int(photo_id[-1])
                photo_ids.append({'person':person_id,'imagenum':photo_number,'photo_path':fpath})

    photo_ids = pd.DataFrame(photo_ids)
    # print(photo_ids)
    #mass-merge
    #(photos now have same order as attributes)
    df = pd.merge(df_attrs,photo_ids,on=('person','imagenum'))

    assert len(df)==len(df_attrs),"lost some data when merging dataframes"

    # print(df.shape)
    #image preprocessing
    all_photos =df['photo_path'].apply(skimage.io.imread)\
                                .apply(lambda img:img[dy:-dy,dx:-dx])\
                                .apply(lambda img: skimage.transform.resize(img,[dimx,dimy]))

    all_photos = np.stack(all_photos.values)#.astype('uint8')
    all_attrs = df.drop(["photo_path","person","imagenum"],axis=1)
    
    return all_photos, all_attrs

In [None]:
data, attrs = fetch_dataset()

In [None]:
for column in attrs.columns:
    attrs[column] = pd.to_numeric(attrs[column])

In [None]:
attrs.head()

In [None]:
dataset = TensorDataset(torch.from_numpy(data).permute(0, 3, 1, 2).float())
train_set, test_set = random_split(dataset, [0.8, 0.2], generator=generator)

### Самостоятельная работа

Создайте даталоадеры для обучающей и тестовой выборки.

In [None]:
train_dl = DataLoader(train_set, batch_size=256, shuffle=True, num_workers=12)
test_dl = DataLoader(test_set, batch_size=256, shuffle=False, num_workers=12)

In [None]:
fig, axes = plt.subplots(2, 4, figsize=(8,4))

batch = next(iter(train_dl))

for i, ax in enumerate(axes.flat):
    ax.axis("off")
    ax.imshow(batch[0][i].permute(1, 2, 0))

In [None]:
LATENT_SIZE = 128

In [None]:
class Autoencoder(nn.Module):
    def __init__(self, input_size=64, input_channels=3, latent_size=LATENT_SIZE):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 4, 3, padding=1, stride=2, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(4),
            nn.Conv2d(4, 16, 3, padding=1, stride=2, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(16),
            nn.Flatten(),
            nn.Linear((input_size)**2, latent_size),
            nn.ReLU(),
            nn.BatchNorm1d(latent_size),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, input_size**2),
            nn.ReLU(),
            #nn.BatchNorm1d(input_size**2),
            nn.Unflatten(1, torch.Size([16, input_size//4, input_size//4])),
            nn.ConvTranspose2d(16, 4, 4, stride=2, padding=1, bias=False),
            nn.ReLU(),
            nn.BatchNorm2d(4),
            nn.ConvTranspose2d(4, input_channels, 4, stride=2, padding=1),
            nn.Sigmoid(),
        )
        
    def forward(self, x):
        latent_code = self.encoder(x)
        reconstruction = self.decoder(latent_code)
        return reconstruction, latent_code

In [None]:
model = Autoencoder()
optimizer = Adam(model.parameters(), lr=3e-4)
torchinfo.summary(model, input_size=(1, 3, 64, 64))

In [None]:
criterion = torch.nn.MSELoss()

In [None]:
from IPython.display import clear_output

In [None]:
def train(model, opt, loss_fn, epochs, data_tr, data_val, device='cuda'):
    model = model.to(device)
    train_loss = []
    val_loss = []
    
    for epoch in range(epochs):
        print('* Epoch %d/%d' % (epoch + 1, epochs))

        model.train()
        avg_loss = 0
        for batch in data_tr:
            opt.zero_grad()
            X = torch.as_tensor(batch[0]).to(device)
            reconstruction, vector = model(X)
            loss = loss_fn(reconstruction, X)
            loss.backward()
            opt.step()
            avg_loss += loss / len(data_tr)
        train_loss.append(avg_loss.detach().cpu())

        model.eval()
        avg_loss = 0
        with torch.no_grad():
            for batch in data_val:
                X = torch.as_tensor(batch[0]).to(device)
                reconstruction, vector = model(X)
                loss = loss_fn(reconstruction, X)
                avg_loss += loss / len(data_val)
        val_loss.append(avg_loss.detach().cpu())
    
        # Visualize
        clear_output(wait=True)
        for k in range(5):
            plt.subplot(3, 5, k+1)
            plt.imshow(torch.as_tensor(batch[0])[k].permute(1, 2, 0))
            plt.title('Real')
            plt.axis('off')

            plt.subplot(3, 5, k+6)
            plt.imshow(reconstruction[k].cpu().permute(1, 2, 0))
            plt.title('Output')
            plt.axis('off')
            
        plt.suptitle('%d / %d - loss: %f' % (epoch+1, epochs, avg_loss))
        plt.show()

In [None]:
train(model, optimizer, criterion, 50, train_dl, test_dl)

In [None]:
set_a = TensorDataset(torch.from_numpy(data[attrs['Smiling'] > 0.75]).permute(0, 3, 1, 2).float())
set_b = TensorDataset(torch.from_numpy(data[attrs['Smiling'] < 1]).permute(0, 3, 1, 2).float())

loader_a = DataLoader(set_a, batch_size=256, shuffle=True, num_workers=12)
loader_b = DataLoader(set_b, batch_size=256, shuffle=False, num_workers=12)

faces_a, vectors_a = model(next(iter(loader_a))[0].to('cuda'))
faces_b, vectors_b = model(next(iter(loader_b))[0].to('cuda'))

mean_vector_a = torch.mean(vectors_a, dim=0) - torch.mean(vectors_b, dim=0)
new_faces = model.decoder(vectors_b + mean_vector_a)

fig, axes = plt.subplots(1, 15, figsize=(20,2))
for i, ax in enumerate(axes.flat):
    ax.axis("off")
    ax.imshow(faces_b[i].permute(1, 2, 0).cpu().detach().numpy())
    
fig, axes = plt.subplots(1, 15, figsize=(20,2))
for i, ax in enumerate(axes.flat):
    ax.axis("off")
    ax.imshow(new_faces[i].permute(1, 2, 0).cpu().detach().numpy())

### VAE

In [None]:
dataset = MNIST('mnist-data', download=True, transform=ToTensor())

In [None]:
train_set, test_set = random_split(dataset, [0.8, 0.2], generator=generator)

In [None]:
train_dl = DataLoader(train_set, batch_size=64, shuffle=True, num_workers=12)
test_dl = DataLoader(test_set, batch_size=64, shuffle=False, num_workers=12)

In [None]:
class VAE(nn.Module):
    def __init__(self, input_size=28, input_channels=1, latent_size=LATENT_SIZE):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Conv2d(input_channels, 4, 3, padding=1, stride=2, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),
            nn.Conv2d(4, 16, 3, padding=1, stride=2, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(16),
            nn.Flatten(),
            nn.Linear(input_size**2, latent_size*2),
            nn.ReLU(inplace=True),
            nn.BatchNorm1d(latent_size*2),
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_size, input_size**2),
            nn.ReLU(inplace=True),
            #nn.BatchNorm1d(input_size**2),
            nn.Unflatten(1, torch.Size([16, input_size//4, input_size//4])),
            nn.ConvTranspose2d(16, 4, 4, stride=2, padding=1, bias=False),
            nn.ReLU(inplace=True),
            nn.BatchNorm2d(4),
            nn.ConvTranspose2d(4, input_channels, 4, stride=2, padding=1),
            nn.Sigmoid(),
        )
        self.latent_size = latent_size
    
    def forward(self, x):
        shapes = x.shape
        mu, logsigma = self.encode(x)
        sample = self.gaussian_sampler(mu, logsigma)
        reconstruction = self.decode(sample)
        return mu, logsigma, reconstruction
        
    def encode(self, x):
        return torch.split(self.encoder(x), self.latent_size, dim=1)
    
    def gaussian_sampler(self, mu, logsigma):
        if self.training:
            # латентный вектор из нормального распределения с параметрами mu и sigma
            return torch.randn_like(logsigma) * torch.exp(logsigma) + mu
        else:
            # на инференсе возвращаем не случайный вектор из нормального распределения, а центральный -- mu. 
            # на инференсе выход автоэнкодера должен быть детерминирован.
            return mu
    
    def decode(self, z):
        return self.decoder(z)

In [None]:
def KL_divergence(mu, logsigma):
    """
    часть функции потерь, которая отвечает за "близость" латентных представлений разных людей
    """
    loss = -0.5 * torch.sum(1 + logsigma - mu ** 2 - torch.exp(logsigma))
    return loss

def log_likelihood(x, reconstruction):
    """
    часть функции потерь, которая отвечает за качество реконструкции (как mse в обычном autoencoder)
    """
    # Чтобы компоненты были соразмерны, понадобится либо reduction='sum' здесь, либо весовой коэффициент ниже:
    loss = torch.nn.BCELoss(reduction='sum')
    return loss(reconstruction, x)

def loss_vae(x, mu, logsigma, reconstruction, kl_weight=1.0):
    return kl_weight * KL_divergence(mu, logsigma) + log_likelihood(x, reconstruction)

In [None]:
loss_func = loss_vae
model = VAE()
optimizer = Adam(model.parameters(), lr=3e-4)

In [None]:
def train(model, opt, loss_fn, epochs, data_tr, data_val, device='cuda'):
    model = model.to(device)
    train_loss = []
    val_loss = []
    
    for epoch in range(epochs):
        print('* Epoch %d/%d' % (epoch + 1, epochs))

        model.train()
        avg_loss = 0
        for X, y in data_tr:
            opt.zero_grad()
            mu, logsigma, reconstruction = model(X.to(device))
            loss = loss_fn(X.to(device), mu, logsigma, reconstruction)
            loss.backward()
            opt.step()
            avg_loss += loss / len(data_tr)
        train_loss.append(avg_loss.detach().cpu())

        model.eval()
        avg_loss = 0
        with torch.no_grad():
            for X, y in data_val:
                mu, logsigma, reconstruction = model(X.to(device))
                loss = loss_fn(X.to(device), mu, logsigma, reconstruction)
                avg_loss += loss / len(data_val)
        val_loss.append(avg_loss.detach().cpu())
    
        # Visualize
        clear_output(wait=True)
        for k in range(5):
            plt.subplot(3, 5, k+1)
            plt.imshow(X[k].cpu().permute(1, 2, 0))
            plt.title('Real')
            plt.axis('off')

            plt.subplot(3, 5, k+6)
            plt.imshow(reconstruction[k].cpu().permute(1, 2, 0))
            plt.title('Output')
            plt.axis('off')
            
        plt.suptitle('%d / %d - loss: %f' % (epoch+1, epochs, avg_loss))
        plt.show()

In [None]:
train(model, optimizer, loss_func, 25, train_dl, test_dl)

In [None]:
mu, logsigma = model.encode(next(iter(test_dl))[0].to('cuda'))

In [None]:
# сгенерируем 32 рандомных вектора размера latent_space
z = torch.randn(batch_size, LATENT_SIZE).to('cuda') * torch.exp(logsigma) + mu
output = model.decode(z)

In [None]:
fig, axes = plt.subplots(4, 8, figsize=(8,4))
for i, ax in enumerate(axes.flat):
    ax.axis("off")
    ax.imshow(output[i].permute(1, 2, 0).cpu().detach().numpy())

In [None]:
from sklearn.manifold import TSNE

In [None]:
def redux(model, dataloader):
    mus = []
    sigmas = []
    y = []
    
    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to('cuda')
            mu, logsigma = model.encode(X_batch)
            mus.append(mu.cpu())
            sigmas.append(logsigma.cpu())
            y.extend(y_batch)
    
    features1 = np.vstack(mus)
    features2 = np.vstack(sigmas)
    features = np.hstack([features1, features2])
    y = np.array(y)
    
    points = TSNE(2,  metric='cosine', n_jobs=-1, random_state=177013).fit_transform(features)
    
    plt.figure(figsize=(10, 8))
    for digit in range(10):
        plt.scatter(points[y==digit][:, 0], points[y==digit][:, 1], marker=f'${digit}$', alpha=0.3);

In [None]:
redux(model, test_dl)

# Домашнее задание

## Easy/Normal

Построим модель регрессии на базе полносвязной сети. Превратить датафрейм pandas в тензоры можно, например, вот так:

In [None]:
df = pd.read_excel('/home/daiyousei/Concrete_Data.xls', sheet_name='Sheet1')

In [None]:
df = df.drop_duplicates()

In [None]:
df = df.rename(lambda x: x.split('(')[0].strip().replace(' ', '_').lower(), axis=1)

In [None]:
X = torch.as_tensor(df.drop(['concrete_compressive_strength'], axis=1).values, dtype=torch.float32)
y = torch.as_tensor(df['concrete_compressive_strength'].values.reshape(-1,1), dtype=torch.float32)

In [None]:
# То же, что делает StandardScaler():
standardized_data = (X - torch.mean(X, dim=0)) / torch.std(X, dim=0)

In [None]:
ds = TensorDataset(X, y)

Кроме того, вы можете попробовать `torch.utils.data.datapipes.CsvLoader()`.

Выделите обучающую и тестовую выборки с помощью `torch.utils.data.random_split()`. Создайте даталоадеры для них.

In [None]:
train_set, test_set = ...

In [None]:
train_dl = ...
test_dl = ...

Допишите полносвязную сеть:

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        # Ваш код ниже:
        

    def forward(self, xb):
        # Ваш код ниже:
        

Инициализируем модель. Для регрессии мы будем использовать MSE или MAE(`L1Loss()`).

In [None]:
model = MyModel()
optimizer = Adam(model.parameters(), lr=0.1)
loss_func = nn.MSELoss()

Допишите цикл обучения:

In [None]:
def train(model, optimizer, loss_func, num_epochs=10):
    for epoch in tqdm(range(num_epochs)):
        for xb, yb in train_dl:
            ...

Вы можете воспользоваться этой функцией для тестирования или модифицировать ее. Попытайтесь подобрать структуру сети и гиперпараметры, чтобы улучшить метрику. Чем больше значимых экспериментов, тем выше будет оценено ваше задание.

Если метрика получилась хуже, чем у модели из sklearn, не расстраивайтесь: не везде сложные инструменты дают лучший результат!

In [None]:
def test(model):
    losses = []
    with torch.no_grad():
        for xb, yb in test_dl:
            with torch.no_grad():
                pred = model(xb)
                mse = loss_func(pred, yb)
                losses.append(mse.mean())
    print(f'MSE на валидации: {torch.mean(torch.as_tensor(losses)):.2f}')

In [None]:
train(model, optimizer, loss_func)

In [None]:
test(model)

## Hard

Попробуйте реализовать Conditional VAE для MNIST: подавать нейросети на вход не только вектор, но и желаемый класс. То есть из одного вектора модель должна уметь восстановить любую цифру, которую ей укажут.

Если вам трудно правильно склеить данные, вы можете попробовать построить все на линейных слоях.

In [None]:
# Ваш код:


Обучите модель, проверьте генерацию.

Исследуйте латентное пространство с помощью понижения размерности. Укажите различия с обычным VAE.