<a href="https://colab.research.google.com/github/NataliaBabushkina/ML_homework/blob/main/Homework1(PyTorch).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# /configs/config.py
import os

ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
PATH = os.path.join(ROOT_DIR, "data/mnist")
FILENAME = 'mnist.pkl'

base_url = "http://yann.lecun.com/exdb/mnist/"
filename = [
    ["training_images", "train-images-idx3-ubyte.gz"],
    ["test_images", "t10k-images-idx3-ubyte.gz"],
    ["training_labels", "train-labels-idx1-ubyte.gz"],
    ["test_labels", "t10k-labels-idx1-ubyte.gz"]
]

input_size = 784
output_size = 10

nrof_epochs = 5
batch_size = 128
lr = 0.05

In [None]:
# /dataloader/data_transforms.py
import numpy as np
import torch


def normalize_images(images):
    return (np.asarray(images, dtype=np.float32) - 127) / 255


def convert2tensor(images):
    return torch.tensor(images)

In [None]:
# /dataloader/simple_dataloader.py
import torch
import numpy as np


class SimpleDataLoader:
    def __init__(self, dataset, config, dataset_type, shuffle=False, transforms=[], skip_last=False):
        # преобазования типа данных из numpy в torch.tensor
        self.state = 0
        self.shuffle = shuffle
        self.config = config
        self.transforms = transforms
        self.skip_last = skip_last
        self.images, self.labels = dataset.get_dataset(dataset_type)
        self.images, self.labels = map(torch.tensor, (self.images, self.labels))
        self.dataset_size = len(self.labels)

    def batch_generator(self):
        if self.shuffle:
            self.images, self.labels = self.shuffle_data(self.images, self.labels)
        self.epoch_size = len(self.images) // self.config.batch_size
        if not self.skip_last:
            self.epoch_size += 1
        for i in range(0, self.epoch_size):
            start = i * self.config.batch_size
            end = (i + 1) * self.config.batch_size
            batch_labels = self.labels[start: end]
            batch_data = self.images[start: end, :]
            for transform in self.transforms:
                batch_data = transform(batch_data)
            yield (batch_data, batch_labels)

    def shuffle_data(self, images, labels):
        indexes = np.arange(len(self.labels))
        indexes = np.random.permutation(indexes)
        images = images[indexes]
        labels = labels[indexes]
        return images, labels 

In [None]:
# /dataset/mnist_dataset.py
from urllib import request
import pickle
import gzip
import numpy as np
import os


class MNIST:
    def __init__(self, config):
        self.config = config
        self._read_dataset()

    def _read_dataset(self):
        if not os.path.exists(os.path.join(self.config.PATH, self.config.FILENAME)):
            self._download_dataset()
        # считывание данных из pickle файлов; каждое изображение хранится в виде вектора размера 28*28
        self.dataset = {}
        with open(os.path.join(self.config.PATH, self.config.FILENAME), "rb") as f:
            data = pickle.load(f, encoding="latin-1")
        self.dataset = {'train': {'images': data['training_images'], 'labels': data['training_labels']},
                        'test': {'images': data['test_images'], 'labels': data['test_labels']}}

    def get_dataset(self, dataset_type):
        return self.dataset[dataset_type]['images'], self.dataset[dataset_type]['labels']

    def _download_dataset(self):
        os.makedirs(self.config.PATH, exist_ok=True)
        for name in self.config.filename:
            if not os.path.exists(os.path.join(self.config.PATH, name[0])):
                print("Downloading " + name[1] + "...")
                request.urlretrieve(self.config.base_url + name[1], self.config.PATH + name[1])
                print("Download complete.")
        self._save_mnist()

    def _save_mnist(self):
        mnist = {}
        for name in self.config.filename[:2]:
            with gzip.open(self.config.PATH + name[1], 'rb') as f:
                mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=16).reshape(-1, 28 * 28)
        for name in self.config.filename[-2:]:
            with gzip.open(self.config.PATH + name[1], 'rb') as f:
                mnist[name[0]] = np.frombuffer(f.read(), np.uint8, offset=8).astype(int)
        with open(os.path.join(self.config.PATH, self.config.FILENAME), 'wb') as f:
            pickle.dump(mnist, f)
        print("Save complete.")

In [None]:
# /executors/train_mnist.py
import numpy as np
import torch

from dataset.mnist_dataset import MNIST
from dataloader.simple_dataloader import SimpleDataLoader
from nets.simple_net import SimpleNet
from metrics.accuracy import accuracy
from losses.simple_loss import nll
from configs import config
from dataloader.data_transforms import normalize_images, convert2tensor


class Trainer:
    def __init__(self, config):
        self.config = config
        self.dataset = MNIST(config)
        self.train_loader = SimpleDataLoader(self.dataset, config, 'train', True, skip_last=True,
                                             transforms=[normalize_images, convert2tensor])
        self.test_loader = SimpleDataLoader(self.dataset, config, 'test', False,
                                            transforms=[normalize_images, convert2tensor])
        self.model = SimpleNet(config)

    def fit(self):
        for epoch in range(self.config.nrof_epochs):
            self.train_epoch(epoch)
            self.eval(epoch)

    def train_epoch(self, epoch):
        for i, (batch_data, batch_labels) in enumerate(self.train_loader.batch_generator()):
            loss = self.model.train_step(batch_data, batch_labels)
            print(f'epoch: [{epoch}/{self.config.nrof_epochs}], '
                  f'iter: [{i}/{self.train_loader.epoch_size}], '
                  f'loss: {loss}')

    def eval(self, epoch):
        total_accuracy = []
        total_loss = []
        with torch.no_grad():
            for batch_data, batch_labels in self.test_loader.batch_generator():
                predictions = self.model(batch_data)
                sum_accuracy = accuracy(predictions, batch_labels).item() * batch_labels.size(0)
                total_accuracy.append(sum_accuracy)

                sum_loss = nll(predictions, batch_labels).item() * batch_labels.size(0)
                total_loss.append(sum_loss)
        print(f'epoch: {epoch}, total accuracy: {np.sum(total_accuracy) * 100 / self.test_loader.dataset_size}%, '
              f'total loss: {np.sum(total_loss) / self.test_loader.dataset_size}')


if __name__ == '__main__':
    trainer = Trainer(config)
    trainer.fit()

In [None]:
# /losses/simple_loss.py

# инициализация целевой функции
def nll(input, target):
    return -input[range(target.shape[0]), target].mean()

In [None]:
# /metrics/accuracy.py
import torch


def accuracy(out, yb):
    preds = torch.argmax(out, dim=1)
    return (preds == yb).float().mean()

In [None]:
# /nets/linear_optim_loss.py
import torch
from torch import nn


class LogReg(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()
        self.lin = nn.Linear(input_dim, output_dim)

    def forward(self, xb):
        return self.lin(xb.view(xb.size(0), -1))


class SimpleNet:
    def __init__(self, config):
        self.model = LogReg(config.input_size, config.output_size)
        self.optim = torch.optim.SGD(self.model.parameters(), lr=config.lr)
        self.loss_function = torch.nn.CrossEntropyLoss()

    def __call__(self, xb):
        return self.model(xb)

    def train_step(self, xb, yb):
        self.optim.zero_grad()
        pred = self(xb)
        loss = self.loss_function(pred, yb)
        loss.backward()
        self.optim.step()
        return loss

In [None]:
# /nets/simple_net.py
import torch
import numpy as np

from losses.simple_loss import nll

class SimpleNet:
    def __init__(self, config):
        self.weights = torch.randn(config.input_size, config.output_size) / np.sqrt(config.input_size)
        self.weights.requires_grad_()
        self.bias = torch.zeros(config.output_size, requires_grad=True)
        self.lr = config.lr

    def log_softmax(self, x):
        return x - x.exp().sum(-1).log().unsqueeze(-1)

    # forward pass
    def __call__(self, xb):
        return self.log_softmax(xb @ self.weights + self.bias)

    def train_step(self, xb, yb):
        pred = self(xb)
        loss = nll(pred, yb)

        loss.backward()
        # обновление градиентов необходимо делать внутри контекста torch.no_grad(),
        # чтобы эти действия не повлияли на вычисления градиентов на следующей итерации
        with torch.no_grad():
            self.weights -= self.weights.grad * self.lr
            self.bias -= self.bias.grad * self.lr
            # Затем мы устанавливаем значения градиентов равными нулю.
            # В противном случае наши градиенты будут суммироваться с градиентами с прошлых итераций
            # (т.е. loss.backward () добавляет градиенты к тому, что уже сохранено, а не заменяет их).
            self.weights.grad.zero_()
            self.bias.grad.zero_()
        return loss