In [None]:
import copy
import datetime
import random
import traceback

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data.dataloader import DataLoader, Dataset

from sklearn.model_selection import train_test_split
import torcheval.metrics as metrics

In [None]:
def init_random_seed(value=0):
    random.seed(value)
    np.random.seed(value)
    torch.manual_seed(value)
    torch.cuda.manual_seed(value)
    torch.backends.cudnn.deterministic = True


def copy_data_to_device(data, device):
    if torch.is_tensor(data):
        return data.to(device)
    elif isinstance(data, (list, tuple)):
        return [copy_data_to_device(elem, device) for elem in data]
    raise ValueError('Недопустимый тип данных {}'.format(type(data)))


def print_grad_stats(model):
    mean = 0
    std = 0
    norm = 1e-5
    for param in model.parameters():
        grad = getattr(param, 'grad', None)
        if grad is not None:
            mean += grad.data.abs().mean()
            std += grad.data.std()
            norm += 1
    mean /= norm
    std /= norm
    print(f'Mean grad {mean}, std {std}, n {norm}')

def min_max_mse_loss(input, target, false_target):
    return F.mse_loss(input, target) + 1/F.mse_loss(input, false_target)

def train_eval_loop(model, train_dataset, val_dataset, criterion,
                    lr=1e-4, epoch_n=10, batch_size=32,
                    device=None, early_stopping_patience=10, l2_reg_alpha=0,
                    max_batches_per_epoch_train=10000,
                    max_batches_per_epoch_val=1000,
                    data_loader_ctor=DataLoader,
                    optimizer_ctor=None,
                    lr_scheduler_ctor=None,
                    shuffle_train=True,
                    plot=False,
                    dataloader_workers_n=0):
    """
    Цикл для обучения модели. После каждой эпохи качество модели оценивается по отложенной выборке.
    :param model: torch.nn.Module - обучаемая модель
    :param train_dataset: torch.utils.data.Dataset - данные для обучения
    :param val_dataset: torch.utils.data.Dataset - данные для оценки качества
    :param criterion: функция потерь для настройки модели
    :param lr: скорость обучения
    :param epoch_n: максимальное количество эпох
    :param batch_size: количество примеров, обрабатываемых моделью за одну итерацию
    :param device: cuda/cpu - устройство, на котором выполнять вычисления
    :param early_stopping_patience: наибольшее количество эпох, в течение которых допускается
        отсутствие улучшения модели, чтобы обучение продолжалось.
    :param l2_reg_alpha: коэффициент L2-регуляризации
    :param max_batches_per_epoch_train: максимальное количество итераций на одну эпоху обучения
    :param max_batches_per_epoch_val: максимальное количество итераций на одну эпоху валидации
    :param data_loader_ctor: функция для создания объекта, преобразующего датасет в батчи
        (по умолчанию torch.utils.data.DataLoader)
    :return: кортеж из двух элементов:
        - среднее значение функции потерь на валидации на лучшей эпохе
        - лучшая модель
    """
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    device = torch.device(device)
    model.to(device)

    if optimizer_ctor is None:
        optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=l2_reg_alpha)
    else:
        optimizer = optimizer_ctor(model.parameters(), lr=lr)

    if lr_scheduler_ctor is not None:
        lr_scheduler = lr_scheduler_ctor(optimizer)
    else:
        lr_scheduler = None

    train_dataloader = data_loader_ctor(train_dataset, batch_size=batch_size, shuffle=shuffle_train,
                                        num_workers=dataloader_workers_n)
    val_dataloader = data_loader_ctor(val_dataset, batch_size=batch_size, shuffle=False,
                                      num_workers=dataloader_workers_n)

    best_val_loss = float('inf')
    best_epoch_i = 0
    best_model = copy.deepcopy(model)

    for epoch_i in range(epoch_n):
        try:
            epoch_start = datetime.datetime.now()
            print('Эпоха {}'.format(epoch_i))

            model.train()
            mean_train_loss = 0
            train_batches_n = 0
            for batch_i, (batch_x, batch_y, batch_false) in enumerate(train_dataloader):
                if batch_i > max_batches_per_epoch_train:
                    break

                batch_x = copy_data_to_device(batch_x, device)
                batch_y = copy_data_to_device(batch_y, device)
                batch_false = copy_data_to_device(batch_false, device)

                pred = model(batch_x)
                loss = criterion(pred, batch_y, batch_false)

                model.zero_grad()
                loss.backward()

                optimizer.step()

                mean_train_loss += float(loss)
                train_batches_n += 1

            mean_train_loss /= train_batches_n
            print('Эпоха: {} итераций, {:0.2f} сек'.format(train_batches_n,
                                                           (datetime.datetime.now() - epoch_start).total_seconds()))
            print('Среднее значение функции потерь на обучении', mean_train_loss)



            model.eval()
            mean_val_loss = 0
            val_batches_n = 0

            with torch.no_grad():
                for batch_i, (batch_x, batch_y, batch_false) in enumerate(val_dataloader):
                    if batch_i > max_batches_per_epoch_val:
                        break

                    batch_x = copy_data_to_device(batch_x, device)
                    batch_y = copy_data_to_device(batch_y, device)
                    batch_false = copy_data_to_device(batch_false, device)

                    pred = model(batch_x)
                    loss = criterion(pred, batch_y, batch_false)

                    mean_val_loss += float(loss)
                    val_batches_n += 1

            mean_val_loss /= val_batches_n
            print('Среднее значение функции потерь на валидации', mean_val_loss)

            if mean_val_loss < best_val_loss:
                best_epoch_i = epoch_i
                best_val_loss = mean_val_loss
                best_model = copy.deepcopy(model)
                print('Новая лучшая модель!')
            elif epoch_i - best_epoch_i > early_stopping_patience:
                print('Модель не улучшилась за последние {} эпох, прекращаем обучение'.format(
                    early_stopping_patience))
                break

            if lr_scheduler is not None:
                lr_scheduler.step(mean_val_loss)

            print()
        except KeyboardInterrupt:
            print('Досрочно остановлено пользователем')
            break
        except Exception as ex:
            print('Ошибка при обучении: {}\n{}'.format(ex, traceback.format_exc()))
            break

    return best_val_loss, best_model


def predict_with_model(model, dataset, device=None, batch_size=32, num_workers=0, return_labels=False):
    """
    :param model: torch.nn.Module - обученная модель
    :param dataset: torch.utils.data.Dataset - данные для применения модели
    :param device: cuda/cpu - устройство, на котором выполнять вычисления
    :param batch_size: количество примеров, обрабатываемых моделью за одну итерацию
    :return: numpy.array размерности len(dataset) x *
    """
    if device is None:
        device = 'cuda' if torch.cuda.is_available() else 'cpu'
    results_by_batch = []

    device = torch.device(device)
    model.to(device)
    model.eval()

    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    labels = []
    with torch.no_grad():
        import tqdm
        for batch_x, batch_y, batch_false in tqdm.tqdm(dataloader, total=len(dataset)/batch_size):
            batch_x = copy_data_to_device(batch_x, device)

            if return_labels:
                labels.append(batch_y.numpy())

            batch_pred = model(batch_x)
            results_by_batch.append(batch_pred.detach().cpu().numpy())

    if return_labels:
        return np.concatenate(results_by_batch, 0), np.concatenate(labels, 0)
    else:
        return np.concatenate(results_by_batch, 0)

### Data

In [None]:
df = pd.read_csv("/kaggle/input/tmnist-typeface-mnist/TMNIST_Data.csv")
df.head()

In [None]:
X = df.iloc[:,2:]
y = df['labels']
dict_of_labels = dict(zip(df['labels'].unique(), df['names'].unique()))

In [None]:
dict_of_labels

In [None]:
rows, cols = 2, 5
fig, ax = plt.subplots(rows, cols, figsize = (cols  * 3, rows * 3))
for i in range(rows):
    for j in range(cols):
        num_class = j + cols * i
        class_series = df.query("labels == @num_class").iloc[0, :]
        ax[i, j].imshow(np.array(class_series[2:], dtype=float).reshape(28,28), cmap="binary")
        ax[i, j].set_title(f"{class_series.iloc[0]}")

In [None]:
subdf = df.iloc[np.random.choice(len(df), 10, replace=False)]
rows, cols = 2, 5
fig, ax = plt.subplots(rows, cols, figsize = (cols  * 3, rows * 3))
for i in range(rows):
    for j in range(cols):
        num_row = j + cols * i
        class_series = subdf.iloc[num_row, :]
        ax[i, j].imshow(np.array(class_series.iloc[2:], dtype=float).reshape(28,28), cmap="binary")
        ax[i, j].set_title(f"{class_series.iloc[0]}")

In [None]:
df_train, df_test, _, _ =  train_test_split(df, df['labels'], test_size=0.4, stratify=df['labels'])
df_test, df_val, _, _ = train_test_split(df_test, df_test['labels'], test_size=0.5, stratify=df_test['labels'])

In [None]:
class TMNIST_dataset(Dataset):
    def __init__(self, pd_df):
        self.dataset = pd_df.reset_index(drop=True)
        self.labels = pd_df[["names", "labels"]]
        self.dataset = self.dataset.drop(["names", "labels"], axis=1)
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, index):
        randint = np.random.randint(0, len(self.dataset), 1)[0]
        img_vector = torch.tensor(self.dataset.iloc[index].to_numpy(), dtype=torch.float32)
        false_vector = torch.tensor(self.dataset.iloc[randint].to_numpy(), dtype=torch.float32)
        return [img_vector, img_vector, false_vector]

In [None]:
train_dataset = TMNIST_dataset(df_train)
validation_dataset = TMNIST_dataset(df_val)
test_dataset = TMNIST_dataset(df_test)

In [None]:
class TMNIST_model(nn.Module):
    def __init__(self, in_shape = 784, n_components = 2):
        super(TMNIST_model, self).__init__()
        
        self.activation = nn.SELU()
        self.in_batchnorm = nn.BatchNorm1d(in_shape)
        self.linear_11 = nn.Linear(in_shape, 400)
        self.linear_12 = nn.Linear(400, 200)
        self.linear_13 = nn.Linear(200, 50)
        self.linear_14 = nn.Linear(50, n_components)
        self.hidden_batchnorm = nn.BatchNorm1d(n_components)
        
        self.linear_21 = nn.Linear(n_components, 50)
        self.linear_22 = nn.Linear(50, 200)
        self.linear_23 = nn.Linear(200, 400)
        self.batchnorm_23 = nn.BatchNorm1d(400)
        self.linear_24 = nn.Linear(400, in_shape)
        
        self.fc_encoder = nn.Sequential(
            self.in_batchnorm,
            self.linear_11,
            self.activation,
            self.linear_12,
            self.activation,
            self.linear_13,
            self.activation,
            self.linear_14,
            self.hidden_batchnorm,
        )
        
        self.fc_decoder = nn.Sequential(
            self.linear_21,
            self.activation,
            self.linear_22,
            self.activation,
            self.linear_23,
            self.batchnorm_23,
            self.activation,
            self.linear_24
        )
        
    def forward(self, data):
        x = self.fc_encoder(data)
        x = self.fc_decoder(x)
        return x

In [None]:
model_1 = TMNIST_model()

In [None]:
print(model_1)

In [None]:
best_loss, best_model = train_eval_loop(model=model_1, 
                train_dataset=train_dataset, 
                val_dataset=validation_dataset, 
                criterion=min_max_mse_loss,
                lr=1e-2, 
                epoch_n=200, 
                batch_size=len(train_dataset)//50,
                device="cuda:0" if torch.cuda.is_available() else "cpu", 
                early_stopping_patience=20, 
                l2_reg_alpha=0.15,
                max_batches_per_epoch_train=10000,
                max_batches_per_epoch_val=1000,
                data_loader_ctor=DataLoader,
                optimizer_ctor=torch.optim.Adam,
                lr_scheduler_ctor=torch.optim.lr_scheduler.ReduceLROnPlateau,
                shuffle_train=True,
                dataloader_workers_n=2)

In [None]:
result = predict_with_model(best_model.fc_encoder, test_dataset)

In [None]:
result.shape

In [None]:
plt.scatter(result[:, 0], result[:, 1])

In [None]:
test_dataset.labels["names"].unique()

In [None]:
# Границы отображения
x_lim = (-6, 6)
y_lim = (-6, 6)
grid_step = 0.5
grid_x = np.arange(x_lim[0], x_lim[1] + grid_step, grid_step).round(1)
grid_y = np.arange(y_lim[0], y_lim[1] + grid_step, grid_step).round(1)


# Отображение
plt.figure(figsize=(9,8), dpi=100)
plt.title("Дексрипторы записей со скрытого слоя", fontsize=16)
sns.scatterplot(x=result[:, 0], y=result[:, 1], hue=test_dataset.labels["labels"])
plt.xlabel("Первый выход промежуточного слоя", fontsize=16)
plt.ylabel("Второй выход промежуточного слоя", fontsize=16)
plt.xlim(x_lim)
plt.ylim(y_lim)
plt.xticks(grid_x, grid_x, fontsize=16, rotation=40)
plt.yticks(grid_y, grid_y, fontsize=16)
plt.axvline(0, y_lim[0], y_lim[1], linewidth=1, color="black")
plt.grid(linestyle="--")
plt.show()

In [None]:
images_out = predict_with_model(best_model, test_dataset)

In [None]:
images_num = 10
random_num = np.random.choice(np.arange(0, len(test_dataset)), images_num)

fig, ax = plt.subplots(images_num, 2, figsize=(12, 6 * images_num))
for image in range(images_num):
    random_image = random_num[image]
    ax[image, 0].imshow(test_dataset[random_image][0].reshape(28, 28))
    ax[image, 1].imshow(images_out[random_image].reshape(28, 28))