In [127]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torchvision import datasets
from torch.utils.data import DataLoader
import numpy
import time
import os
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
import random
random.seed(seed)

In [2]:
data = datasets.MNIST(os.getcwd(), download=False)

In [3]:
torch.tensor(np.array([[-1, 0, 1]]*3))
# [[-1, 2, -1]]*3

tensor([[-1,  0,  1],
        [-1,  0,  1],
        [-1,  0,  1]], dtype=torch.int32)

In [None]:

#hyper params
num_epoch = 1
cuda_device = -1
batch_size = 128
device = f'cuda:{cuda_device}' if cuda_device != -1 else 'cpu'
# input_dimension = 28*28
# hidden_dimension = 14*14
# latent_dimension = 7*7




def collate_fn(data):
    pics = []
    targets = []
    # data = [(pic, target), ....]
    for item in data:
        pics.append(numpy.array(item[0]))
        targets.append(item[1])
    return {
        'data': torch.from_numpy(numpy.array(pics)) / 255,
        'target': torch.from_numpy(numpy.array(targets))
    }

#model
class MyMNISTEncoder(nn.Module):
    def __init__(self,
                 in_channel: int,
                 hidden_channel: int,
                 ):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channel, hidden_channel, kernel_size=5, padding=2, stride=1) # 28 x 28
        self.pool1 = nn.MaxPool2d(2,2)
        self.conv2 = nn.Conv2d(hidden_channel, hidden_channel, kernel_size=3, padding=1, stride=1) # 14 x 14
        self.pool2 = nn.MaxPool2d(2,2)
        self.conv3 = nn.Conv2d(hidden_channel, 1, kernel_size=3, padding=1, stride=1) # 7 x 7

        self.activation = nn.ReLU()

    def forward(self, x):
        # x = self.conv1(x)
        # x = self.avgpool1(x)
        # x = self.activation(x)
        # x = self.conv2(x)
        # x = self.avgpool2(x)
        # x = self.activation(x)
        # x = self.conv3(x)
        # x = self.activation(x)

        x = self.activation(self.pool1(self.conv1(x)))
        x = self.activation(self.pool2(self.conv2(x)))
        x = self.activation(self.conv3(x))

        return x

class MyMNISTDecoder(nn.Module):
    def __init__(self,
                 in_channel: int,
                 hidden_channel: int,
                 ):
        super().__init__()
        self.conv1 = nn.Conv2d(1, hidden_channel, kernel_size=5, padding=2, stride=1) # 7 x 7
        self.unpool1 = nn.UpsamplingNearest2d(scale_factor=2)
        self.conv2 = nn.Conv2d(hidden_channel, hidden_channel, kernel_size=3, padding=1, stride=1) # 14 x 14
        self.unpool2 = nn.UpsamplingNearest2d(scale_factor=2)
        self.conv3 = nn.Conv2d(hidden_channel, in_channel, kernel_size=3, padding=1, stride=1) # 28 x 28

        self.activation = nn.ReLU()

    def forward(self, x):
        # x = self.conv1(x)
        # x = self.unpool1(x)
        # x = self.activation(x)
        # x = self.conv2(x)
        # x = self.unpool2(x)
        # x = self.activation(x)
        x = self.activation(self.unpool1(self.conv1(x)))
        x = self.activation(self.unpool2(self.conv2(x)))
        # print(x.size())
        x = self.activation(self.conv3(x))

        return x

class MyAutoEncoder(nn.Module):
    def __init__(self,
                 input_dimension,
                 hidden_dimension,
                 ):
        super().__init__()
        self.encoder = MyMNISTEncoder(input_dimension, hidden_dimension)
        self.decoder = MyMNISTDecoder(input_dimension, hidden_dimension)

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)

        return x


# init model
model = MyAutoEncoder(1, 50)
model.train()
model.to(device)

#optimizer
optim = torch.optim.Adam(model.parameters(), lr=0.001)

#lr scheduler

#dataset
dataset = datasets.MNIST(os.getcwd(), download=False)

#loss
loss_func = nn.MSELoss()


# train loop
time_start = time.time()
len_dl = len(DataLoader(dataset=dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, drop_last=True,))
print(time.ctime())
for epoch in range(num_epoch):
    #dataloder
    data_loader = DataLoader(dataset=dataset,
                             batch_size=batch_size,
                             shuffle=True,
                             collate_fn=collate_fn,
                             drop_last=True,
                             )
    time_start_epoch = time.time()
    print('Началась эпоха: ' + str(epoch+1))
    for i, batch in enumerate(data_loader):
        data = batch['data'].to(device).unsqueeze(1)
        optim.zero_grad()
        predict = model(data)
        # print(data.size())
        # print(predict.size())
        loss = loss_func(predict, data)
        loss.backward()
        optim.step()
        if (i <= 0) or (i == 20) or (i == 50) or i == len(data_loader)//2 - 1:
            print('    ' + f'{i+1:0{len(str(len_dl))}}' + '/' + str(len_dl) + ': ' + str(loss))
            print('    ...')
        elif i == len(data_loader) - 1:
            print('    ' + f'{i+1:0{len(str(len_dl))}}' + '/' + str(len_dl) + ': ' + str(loss))
    print('Эпоха длилась: ' + time.strftime('%H:%M:%S', time.gmtime(time.time() - time_start_epoch)))
    print('-------------------------------------------------------')

print('Всё обучение заняло: ' + time.strftime('%H:%M:%S', time.gmtime(time.time() - time_start)))


id = 784
test = dataset.data[id].unsqueeze(0).unsqueeze(0).float() / 255
# test.size()
predict = model(test)
plt.imshow(dataset.data[id].float() / 255)
plt.show()
plt.imshow(predict[0][0].detach().numpy())
plt.show()

Mon Apr 11 19:51:12 2022
Началась эпоха: 1
    001/468: tensor(0.1055, grad_fn=<MseLossBackward0>)
    ...
    021/468: tensor(0.0510, grad_fn=<MseLossBackward0>)
    ...


In [105]:
# id = 32
# test = dataset.data[id].unsqueeze(0).unsqueeze(0).float() / 255
# # test.size()
# predict = model(test)
# plt.imshow(dataset.data[id].float() / 255)
# plt.show()
# plt.imshow(predict[0][0].detach().numpy())
# plt.show()

In [85]:
# predict[0][0]