# Wprowadzenie do sieci neuronowych i uczenia maszynowego

## Lab: Własne moduły w PyTorch, regularyzacja i autoenkodery

---

**Autorzy materiałów:** Marek Wydmuch, Iwo Błądek, Jakub Bednarek<br>

---


Ćwiczenia wykonane przez: \
Wojciech Kot 151879 \
Julia Samp 151775 \

---

## Uwaga

* **Aby wykonać polecenia należy najpierw przejść do trybu 'playground'. File -> Open in Playground Mode**
* Nowe funkcje Colab pozwalają na autouzupełnianie oraz czytanie dokumentacji.


## Cel ćwiczeń:

- zapoznanie się z tworzeniem własnych modułów w PyTorch
- wykorzystanie podstawowych mechanizmów regularyzacji: Dropout i Batch normalization

In [None]:
import numpy as np
import torch

print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")

In [None]:
import numpy as np
import torch
from torch.utils.data import TensorDataset, DataLoader
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt

train_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=ToTensor()
)

test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=ToTensor()
)

## Własne moduły (warstwy sieci neuronowych) w PyTorch



Na poprzednich zajęciach używaliśmy gotowych modułów reprezentujących warstwy sieci neuronowych by stworzyć główny moduł naszego modułu.

W PyTorch nie ma żadnej hierarchii modułów (jak np. w TensorFlow czy Keras, gdzie API jest podzielone na modele i warstwy). Każdy moduł może używać innych modułów jako swoich komponentów.

Poniżej przykładowa implementacja modułu warstwy liniowej całkowicie od podstaw.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class CustomLayer(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(CustomLayer, self).__init__()
        # Parametry (wagi) naszego modułu
        self.weights = nn.Parameter(torch.Tensor(input_dim, output_dim))
        # Inicjalizacja wag
        nn.init.xavier_normal_(self.weights)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return torch.mm(x, self.weights)

class CustomModel(nn.Module):
    def __init__(self, num_classes=10):
        super(CustomModel, self).__init__()
        self.layers = nn.Sequential(
            CustomLayer(784, 512),
            nn.ReLU(),
            CustomLayer(512, 512),
            nn.ReLU(),
            CustomLayer(512, num_classes),
            nn.Softmax(dim=1)
        )

    def forward(self, x):
        return self.layers(x)

In [None]:
# Setup naszego modelu
model = CustomModel(num_classes=10).cuda()
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
test_loader = DataLoader(test_data, batch_size=128)

def accuracy(pred, target):
    return (pred.argmax(1) == target).type(torch.float).sum().item()

# dodane do problemów z zad 6 i device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# Pętla treningowa i testowa
def train_and_test(
        train_loader,
        test_loader,
        model,
        optimizer,
        criterion,
        metric=None,
        epochs=10,
        verbose=False
    ):
    epochs_history = []
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_metric = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.cuda()
            target = target.cuda()
            optimizer.zero_grad()
            pred = model(data)
            loss = criterion(pred, target)
            loss.backward()
            optimizer.step()

            if verbose and batch_idx % 100 == 0:
                print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
            train_loss += loss.item() * data.size(0)
            train_metric += metric(pred, target)
        if verbose:
            train_loss /= len(train_loader.dataset)
            train_metric /= len(train_loader.dataset)
            print(f"Train loss: {train_loss:.4f}")
            print(f"Train {metric.__name__}: {train_metric:.4f}")

        model.eval()
        test_metric = 0
        test_loss = 0
        for batch_idx, (data, target) in enumerate(test_loader):
            data = data.cuda()
            target = target.cuda()
            pred = model(data)
            loss = criterion(pred, target)
            test_loss += loss.item() * data.size(0)
            test_metric += metric(pred, target)
        if verbose:
            test_loss /= len(test_loader.dataset)
            test_metric /= len(test_loader.dataset)
            print(f"Test loss: {test_loss:.4f}")
            print(f"Test accuracy: {test_metric:.4f}")
            print("-------------------------------")

        epochs_history.append({
          "epoch": epoch,
          "train_loss": train_loss,
          f"train_{metric.__name__}": train_metric,
          "test_loss": test_loss,
          f"test_{metric.__name__}": test_metric
        })
    return epochs_history

_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)

### Zadanie 1

Stwórz prosty model
- warstwy konwolucyjnej (Conv2D): 32 filtry 3x3,
- konwolucyjnej: 64 filtry 3x3,
- warstwy MaxPooling (MaxPooling2D): 2x2
- warstwy ukrytej gęstej (Dense): 128 neuronów,
- warstwy wyjściowej.

Ważne:
- w każdej warstwie poza warstwą wyjściową funkcją aktywacji powinno być relu,
- funkcja aktywacji dla warstwy wyjściowej to softmax,
- między częścią konwolucyjną a gęstą trzeba spłaszczyć tensor przy pomocy warstwy `nn.Flatten`.

In [None]:
class CustomConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0):
        super(CustomConv2d, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.conv(x)
        x = self.activation(x)
        return x

class CustomDense(nn.Module):
    def __init__(self, in_features, out_features):
        super(CustomDense, self).__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.linear(x)
        x = self.activation(x)
        return x

class SimpleModel(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleModel, self).__init__()
        self.layers = nn.Sequential(
            CustomConv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            CustomConv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            CustomDense(in_features=64 * 14 * 14, out_features=128),
            nn.Linear(in_features=128, out_features=num_classes),
            nn.Softmax(dim=1)
        )
        self._initialize_weights()

    # Inicjalizacja parametrów, która znacząco poprawi początkowe uczenie
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.layers(x)

model = SimpleModel(num_classes=10).cuda()
optimizer = optim.Adam(model.parameters())
_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)

## Zadanie 2

Na podstawie powyższego przykładu stwórz moduł bloku ResNet.
Zadbaj o to by rozmiary tensorów po warstwach konwolucyjnych się nie zmieniały.

![resnet](https://miro.medium.com/max/1000/1*6HDuqhUzP92iXhHoS0Wl3w.png)

Zmodyfikuj model z zadania 1, zamieniając warstwy konwolucyjne na dwa modele bloku ResNet.



In [None]:
class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, padding='same'):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size, padding=padding)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size, padding=padding)
        self.relu2 = nn.ReLU()

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.relu1(out)
        out = self.conv2(out)
        out = out + identity
        out = self.relu2(out)
        return out

class CustomDense(nn.Module):
    def __init__(self, in_features, out_features):
        super(CustomDense, self).__init__()
        self.linear = nn.Linear(in_features, out_features)
        self.activation = nn.ReLU()

    def forward(self, x):
        x = self.linear(x)
        x = self.activation(x)
        return x

class SimpleModel(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleModel, self).__init__()
        self.layers = nn.Sequential(
            ResNetBlock(in_channels=1, out_channels=1),
            ResNetBlock(in_channels=1, out_channels=1),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
            CustomDense(in_features=1 * 14 * 14, out_features=128),
            nn.Linear(in_features=128, out_features=num_classes),
            nn.Softmax(dim=1)
        )
        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.layers(x)

model = SimpleModel(num_classes=10).cuda()
optimizer = optim.Adam(model.parameters())
_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)


## Regularyzacja

### Zadanie 3

Rozszerz model stworzony w zadaniu 1 o dwie warstwy Dropout (nn.Dropout):
- jedna po warstwie MaxPooling (wartość współczynnika odrzucenia 0.25)
- druga po gęstej warstwie ukrytej (Dense), wartość współczynnika odrzucenia 0.5.
- dodaj opcję włączenia i wyłączenia dropoutu jako argument konstruktora modułu modelu.



In [None]:
class SimpleModel(nn.Module):
    def __init__(self, num_classes=10, dropout_after_pooling=True, dropout_after_dense=True):
        super(SimpleModel, self).__init__()

        layers = [
            CustomConv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1),
            CustomConv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
            nn.MaxPool2d(kernel_size=2, stride=2)
        ]

        if dropout_after_pooling:
            layers.append(nn.Dropout(p=0.25))

        layers.append(nn.Flatten())
        layers.append(CustomDense(in_features=64 * 14 * 14, out_features=128))

        if dropout_after_dense:
            layers.append(nn.Dropout(p=0.5))

        layers.append(nn.Linear(in_features=128, out_features=num_classes))
        layers.append(nn.Softmax(dim=1))

        self.layers = nn.Sequential(*layers)

        self._initialize_weights()

    # Inicjalizacja parametrów, która znacząco poprawi początkowe uczenie
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.layers(x)

model = SimpleModel(num_classes=10).cuda()
optimizer = optim.Adam(model.parameters())
_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)


### Zadanie 4
Rozszerz model stworzony w poprzednich zadaniach o dwie warstwy Batch normalization (nn.BatchNorm2d) po warstwach konwolucyjnych. Dodaj opcję włączenia i wyłączenia dropoutu jako argument konstruktora modułu modelu.



In [None]:
class SimpleModel(nn.Module):
    def __init__(self, num_classes=10, dropout_after_pooling=True, dropout_after_dense=True, use_batchnorm=True):
        super(SimpleModel, self).__init__()

        layers = [
            CustomConv2d(in_channels=1, out_channels=32, kernel_size=3, padding=1)
        ]

        if use_batchnorm:
            layers.append(nn.BatchNorm2d(32))

        layers.append(CustomConv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1))

        if use_batchnorm:
            layers.append(nn.BatchNorm2d(64))

        layers.append(nn.MaxPool2d(kernel_size=2, stride=2))

        if dropout_after_pooling:
            layers.append(nn.Dropout(p=0.25))

        layers.append(nn.Flatten())
        layers.append(CustomDense(in_features=64 * 14 * 14, out_features=128))

        if dropout_after_dense:
            layers.append(nn.Dropout(p=0.5))

        layers.append(nn.Linear(in_features=128, out_features=num_classes))
        layers.append(nn.Softmax(dim=1))

        self.layers = nn.Sequential(*layers)

        self._initialize_weights()

    # Inicjalizacja parametrów, która znacząco poprawi początkowe uczenie
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                if m.bias is not None:
                    nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.Linear):
                nn.init.xavier_normal_(m.weight)
                nn.init.constant_(m.bias, 0)

    def forward(self, x):
        return self.layers(x)

model = SimpleModel(num_classes=10).cuda()
optimizer = optim.Adam(model.parameters())
_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)


### Zadanie 5
Porównaj model bez oraz z różnych kombinacjami technik regularyzacji (z dropoutem ale bez batch norm., bez dropout ale z batch norm., z dropoutem i z batch norm.).
Stwórz cztery wykresy:
- błąd funkcji celu dla zbioru treningowego,
- błąd funkcji celu dla zbioru walidacyjnego,
- trafność klasyfikacji dla zbioru treningowego,
- trafność klasyfikacji dla zbioru walidacyjnego.

In [None]:
import matplotlib.pyplot as plt

def plot_loss_split(history_list, label_list):
    epochs = [entry['epoch'] for entry in history_list[0]]

    plt.figure()
    for history, label in zip(history_list, label_list):
        train_loss = [entry['train_loss'] for entry in history]
        plt.plot(epochs, train_loss, label=f"Train {label}")
    plt.title("Błąd funkcji celu - Zbiór treningowy")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

    plt.figure()
    for history, label in zip(history_list, label_list):
        test_loss = [entry['test_loss'] for entry in history]
        plt.plot(epochs, test_loss, label=f"Test {label}")
    plt.title("Błąd funkcji celu - Zbiór testowy")
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.legend()
    plt.show()

def plot_accuracy_split(history_list, label_list):
    epochs = [entry['epoch'] for entry in history_list[0]]

    plt.figure()
    for history, label in zip(history_list, label_list):
        train_accuracy = [entry['train_accuracy'] for entry in history]
        plt.plot(epochs, train_accuracy, label=f"Train {label}")
    plt.title("Trafność klasyfikacji - Zbiór treningowy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

    plt.figure()
    for history, label in zip(history_list, label_list):
        test_accuracy = [entry['test_accuracy'] for entry in history]
        plt.plot(epochs, test_accuracy, label=f"Test {label}")
    plt.title("Trafność klasyfikacji - Zbiór testowy")
    plt.xlabel("Epochs")
    plt.ylabel("Accuracy")
    plt.legend()
    plt.show()

models = [
    SimpleModel(num_classes=10, dropout_after_dense=False, dropout_after_pooling=False, use_batchnorm=False).cuda(),
    SimpleModel(num_classes=10, use_batchnorm=False).cuda(),
    SimpleModel(num_classes=10, dropout_after_dense=False, dropout_after_pooling=False).cuda(),
    SimpleModel(num_classes=10).cuda()
]

label_list = [
    "Model bez 'dodatków'",
    "Model z Dropoutem (bez BatchNorm)",
    "Model z BatchNorm (bez Dropoutu)",
    "Model z Dropoutem i BatchNorm"
]

history_list = []
for model, name in zip(models, label_list):
    print(f"Trening modelu: {name}")
    optimizer = optim.Adam(model.parameters())
    history = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10)
    history_list.append(history)

# Rysowanie wykresów
plot_loss_split(history_list, label_list)
plot_accuracy_split(history_list, label_list)


## Autoenkodery

Ideę autoenkodera prezentuje poniższy rysunek:
![label-autoencoder_schema](https://drive.google.com/uc?export=view&id=1Ai2ER1ppKfnHg5t_lCwO_fvvFNe59dgd)

Widzimy tutaj, że obrazek ze zbioru MNIST o rozmiarze 28 x 28 został skompresowany przez **enkoder** do tensora o rozmiarze 5 x 2. Tensor ten nosi nazwę **wektora zmiennych ukrytych** (ang. latent vector). Następnie **dekoder** przyjął ten wektor na wejście, i odtworzył oryginalny obrazek. Jest to przykład zadania **autoasocjacji**, gdzie celem uczenia sieci neuronowej jest możliwie wierne odtworzenie danych wejściowych. Zadanie to może się wydawać bez sensu w odosobnieniu (po co odtwarzać coś, co już mamy?), jednak to co nas najbardziej interesuje w autoenkoderze to wektor zmiennych ukrytych. Jako że skompresowaliśmy cały obrazek do 10 wartości, to by realistyczne odtworzenie z nich oryginalnego obrazka było możliwe, każda z tych wartości musi 1) zawierać o nim możliwie dużo informacji, 2) nieistotne detale oryginalnego obrazka muszą zostać pominięte. Wyciągnęliśmy więc z danych informacyjną "esencję", pozbyliśmy się redundatnych elementów opisu.

<br>

Najważniejszą cechą autoenkodera jest właśnie **uczenie się efektywnego kodowania danych**, co zazwyczaj wiąże się z redukcją wymiarowości (tzw. 'undercomplete autoencoders'), choć można też uczyć autoenkodery o kodowaniu zwiększającym wymiarowość (tzw. 'overcomplete autoencoders'). Skupimy się na autoenkoderach zmniejszających wymiarowość, bo ich uczenie jest znacznie prostsze. W przeciwieństwie do np. PCA kodowanie uzyskane przez taki autoenkoder może być nieliniowe, tak więc zmienne ukryte mają więcej elastyczności w reprezentacji danych. Selekcja atrybutów, której podstawy omawialiśmy wcześniej na przedmiocie, również redukuje wymiarowość, ale nie zmienia informacji w atrybutach. W ogólności techniki redukcji wymiarowości tworzą zupełnie nowe atrybuty ze starych (np. $Y_1 = 0.5X_1 - 0.25X_2^2 + \log_2 X_3$, gdzie $Y_1$ to nowy atrybut, a $X_i$ to oryginalne atrybuty). Wiele zastosowań autoenkoderów buduje właśnie na tej zdolności, a także na tym, że dzięki procesowi uczenia sieci tworzymy metodę redukcji wymiarowości zoptymalizowaną do konkretnego problemu.

<br>

Autoenkodery mają wiele potencjalnych zastosowań, przykładowo:
* Redukcja wymiarowości - uczymy autoenkoder w trybie autoasocjacji, i naszym celem jest zmniejszenie wymiarowości danych, czyli zastąpienie oryginalnego obiektu jego wektorem zmiennych ukrytych. Można użyć jako alternatywy dla selekcji atrybutów i dedykowanych metod redukcji wymiarowości (PCA, LDA, etc.).
* Grupowanie - uczymy autoenkoder w trybie autoasocjacji, wykorzystujemy wektory zmiennych ukrytych uzyskane dla danych uczących jako wejście do algorytmu grupowania (np. k-means).
* Wyszukiwanie informacji - uczymy autoenkoder w trybie autoasocjacji, wykorzystujemy wektor zmiennych ukrytych jako hasz obiektu. Jeżeli chcemy znaleźć w bazie danych obiekty podobne do zadanego obiektu $X$, to generujemy wektor zmiennych ukrytych (hasz) $X$ przy użyciu enkodera i szukamy obiektów w zbiorze o najbardziej zbliżonych haszach. Technika ta nazywana jest haszowaniem semantycznym.
* Wykrywanie anomalii - uczymy autoenkoder w trybie autoasocjacji wyłącznie na przypadkach "normalnych". Liczymy na to, że jak kiedykolwiek autoenkoder dostanie do przetworzenia przypadek odstający/anomalię, to nie da rady jej dobrze zrekonstruować i błąd będzie wysoki (właśnie przez to, że nie miał szansy się na nich nauczyć).
* Generowanie danych - uczymy autoenkoder w trybie autoasocjacji, a następnie jak chcemy uzyskać różne warianty danego obrazu/obiektu, to modyfikujemy jego wygenerowany przez enkoder wektor zmiennych ukrytych, i dajemy go do przetworzenia dekoderowi. Możemy też po prostu losowo próbkować przestrzeń wektorów ukrytych i obserwować wyniki po przetworzeniu przez dekoder. Przy odrobinie szczęścia jakaś zmienna ukryta może np. odpowiadać za wyraz twarzy człowieka, i zmieniając wartości tej zmiennej możemy zmieniać wyłącznie wyraz twarzy człowieka na zdjęciu. Do tego zadania zazwyczaj wykorzystywany jest zmodyfikowany wariant autoenkodera: autoenkoder wariacyjny.
* Odszumianie - uczymy autoenkoder w trybie heteroasocjacji: *zaszumiony obraz* -> *oryginalny obraz*. Liczymy na to, że enkoder i dekoder nauczą się poprawnie rozpoznawać szum jako element redundantny, nieniosący żadnej informacji.<br>
![label-autoencoder_denoising](https://drive.google.com/uc?export=view&id=1a1fP7CWjzKSwo0txUBUMwaxTrpt6LzyH)



### Zadanie 6

Zaimplementuj autoenkoder o następującej architekturze:
* Enkoder: Dense(100) -> Dense(50) -> Dense(10) (wektor zmiennych ukrytych).
* Dekoder: Dense (50) -> Dense(100) -> Dense(784) (wyjściowy zrekonstruowany obrazek)

Funkcję aktywacji ustaw jako 'relu'. Jako miara błędu użyte zostanie MSE.

In [None]:
# zmiany w funkcji train_and_test, tak aby dostosować ją do autoenkoderów.
# na szczęście Colab i wbudowane Gemini przyszły z pomocą. Mam nadzieję że to jest ok, z racji że nie było to częścią zadania ;)

def train_and_test(
        train_loader,
        test_loader,
        model,
        optimizer,
        criterion,
        metric=None,
        epochs=10,
        verbose=False
    ):
    epochs_history = []
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        train_metric = 0
        for batch_idx, (data, target) in enumerate(train_loader):
            data = data.cuda()
            # target = target.cuda() # This line is causing the issue.
            # For autoencoder, the target is the input itself
            target = data.view(-1, 784).cuda() # Reshape target to match autoencoder output
            optimizer.zero_grad()
            pred = model(data)
            loss = criterion(pred, target)
            loss.backward()
            optimizer.step()

            if verbose and batch_idx % 100 == 0:
                print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
            train_loss += loss.item() * data.size(0)
            # train_metric += metric(pred, target) # This line might also cause an issue
            # For autoencoders, accuracy is not a relevant metric. We can skip this line.
        if verbose:
            train_loss /= len(train_loader.dataset)
            # train_metric /= len(train_loader.dataset) # Skip this line as well
            print(f"Train loss: {train_loss:.4f}")
            # print(f"Train {metric.__name__}: {train_metric:.4f}") # Skip this line

        model.eval()
        test_metric = 0
        test_loss = 0
        for batch_idx, (data, target) in enumerate(test_loader):
            data = data.cuda()
            target = data.view(-1, 784).cuda() # Reshape target for test data as well
            pred = model(data)
            loss = criterion(pred, target)
            test_loss += loss.item() * data.size(0)
            # test_metric += metric(pred, target) # Skip this line
        if verbose:
            test_loss /= len(test_loader.dataset)
            # test_metric /= len(test_loader.dataset) # Skip this line
            print(f"Test loss: {test_loss:.4f}")
            # print(f"Test accuracy: {test_metric:.4f}") # Skip this line
            print("-------------------------------")

        epochs_history.append({
          "epoch": epoch,
          "train_loss": train_loss,
          # f"train_{metric.__name__}": train_metric, # Skip this line
          "test_loss": test_loss,
          # f"test_{metric.__name__}": test_metric # Skip this line
        })
    return epochs_history

In [None]:
import torch
import torch.nn as nn
import torch.nn.init as init

class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(784, 100), # Colab tutaj sam już autouzupełnia wszystkie warstwy, więc pewnie bardzo dobrze zna ten notatnik ;)
            nn.ReLU(),
            nn.Linear(100, 50),
            nn.ReLU(),
            nn.Linear(50, 10),
            # nn.ReLU()
        )

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(10, 50),
            nn.ReLU(),
            nn.Linear(50, 100),
            nn.ReLU(),
            nn.Linear(100, 784),
            nn.Sigmoid() #o, a to colab sam z siebie dodał, ciekawe czy powinno tu być!
        )


        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                init.kaiming_normal_(m.weight, mode='fan_in', nonlinearity='relu')
                init.constant_(m.bias, 0)

    def forward(self, x):
        # Ensure input is flattened
        x = x.view(-1, 784)
        # Encode
        encoded = self.encoder(x)
        # Decode
        decoded = self.decoder(encoded)
        return decoded

# Model creation
model = Autoencoder().cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters())
_ = train_and_test(train_loader, test_loader, model, optimizer, criterion, metric=accuracy, epochs=10, verbose=True)

### Zadanie 7

Użyj zaimplementowanego wyżej autoenkodera by zaobserwować jak zmienia się odtwarzany obraz gdy zmienia się tylko jeden element wektora zmiennych ukrytych. Zadanie wykonywane jest w następujących krokach:
1. Użyj nauczonego enkodera z poprzedniego zadania na przypadku uczącym wybranej cyfry (dowolnej) by uzyskać wektor zmiennych ukrytych.
2. Wybierz konkretną zmienną ukrytą w tym wektorze, np. tę o indeksie 0.
3. W pętli podstawiaj różne wartości do tej zmiennej wektora z pewnym krokiem, i obserwuj obrazki generowane przez dekoder.

In [None]:
example = train_data[0][0]
print("Original image")
plt.imshow(example.reshape((28, 28)), cmap='gray')
plt.show()

x = np.expand_dims(example, 0) # Dodatkowy wymiar na pozycję przypadku w 'batchu', by wymiarowość się zgadzała
# print(x)
x = torch.from_numpy(x).float().cuda()
# Po uczeniu wykonanym w poprzednim zadaniu, mamy dostęp do już nauczonych składowych sieci
x = x.view(-1, 784)  # Spłaszczenie wejścia
latent_vector = model.encoder(x)
latent_vector_np = latent_vector.detach().cpu().numpy()
output = model.decoder(latent_vector).detach().cpu().numpy()
#detach(?)

print("Reconstructed image")
plt.imshow(output.reshape((28, 28)), cmap='gray')
plt.show()

print(f"latent_vector: {latent_vector}")

for e in np.linspace(latent_vector_np.min(), latent_vector_np.max(), 11):
  print(f"e: {e}")

  # Zmień wektor zmiennych ukrytych, ustawiając wartość e w odpowiednim polu wektora
  modified_latent_vector = latent_vector.clone()
  modified_latent_vector[0, 0] = e
  # Użyj dekodera by wygenerować obrazek z nowego wektora zmiennych ukrytych
  generated = model.decoder(modified_latent_vector).detach().cpu().numpy().reshape((28, 28))
  # Pokaż wynikowy obrazek
  plt.imshow(generated, cmap='gray')
  plt.title(f"Latent dim 0 set to {e:.2f}")
  plt.show()





### Zadanie 8

Wykorzystaj kod z poprzednich zadań by nauczyć autoenkoder odszumiania. Parę uwag:
* Musisz zmodyfikować zbiór uczący poprzez dodanie sztucznego szumu. Można to zrobić albo poprzez zmianę wartości kilku losowych pikseli w każdym obrazku, albo dodanie macierzy z małymi losowo generowanymi liczbami do obrazka (do każdego obrazka innej!). Oczekiwaną odpowiedzą podczas uczenia będzie oryginalny obrazek bez szumu.
* Architektura enkodera i dekodera może pozostać bez zmian, ale będziesz musiał ją nauczyć na nowym zbiorze danych, tak więc by nie psuć wyników z zadania nr 6 sugeruję przeklejenie odpowiedniego kodu tutaj.
* Zademonstruj działanie odszumiania poprzez pokazanie przypadku z szumem, a następnie zrekonstruowanego obrazka bez szumu po przetworzeniu przez autoenkoder.

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset

# 1. Tworzenie zbioru danych z dodanym szumem
class NoisyDataset(Dataset):
    def __init__(self, dataset, noise_factor=0.5):
        self.dataset = dataset
        self.noise_factor = noise_factor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        img, _ = self.dataset[idx]
        img = np.array(img) / 255.0
        noise = self.noise_factor * np.random.randn(*img.shape)
        noisy_img = np.clip(img + noise, 0., 1.)  # Dodajemy szum
        return torch.tensor(noisy_img, dtype=torch.float32), torch.tensor(img, dtype=torch.float32)


# Utwórz zbiór uczący i testowy
# noise_factor = 0.001
noise_factor = 0.01
train_noisy_data = NoisyDataset(train_data, noise_factor=noise_factor)
test_noisy_data = NoisyDataset(test_data, noise_factor=noise_factor)

train_loader = DataLoader(train_noisy_data, batch_size=64, shuffle=True)
test_loader = DataLoader(test_noisy_data, batch_size=64, shuffle=False)

# 2. Definicja architektury autoenkodera
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
        )

        self.decoder = nn.Sequential(
            nn.Linear(32, 64),
            nn.ReLU(),
            nn.Linear(64, 128),
            nn.ReLU(),
            nn.Linear(128, 28 * 28),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x.view(-1, 28 * 28)  # Spłaszczenie wejścia
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded.view(-1, 1, 28, 28)  # Przywrócenie wymiarów obrazu

# 3. Trening autoenkodera
def train_autoencoder(train_loader, model, optimizer, criterion, epochs=3):
    model.train()
    for epoch in range(epochs):
        train_loss = 0
        for noisy_imgs, clean_imgs in train_loader:
            noisy_imgs, clean_imgs = noisy_imgs.cuda(), clean_imgs.cuda()
            optimizer.zero_grad()
            outputs = model(noisy_imgs)
            loss = criterion(outputs, clean_imgs)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
        print(f"Epoch [{epoch + 1}/{epochs}], Loss: {train_loss / len(train_loader):.4f}")

# Model i optymalizacja
model = Autoencoder().cuda()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Trening
train_autoencoder(train_loader, model, optimizer, criterion, epochs=10)

# 4. Demonstracja działania
model.eval()
with torch.no_grad():
    # Pobierz próbkę ze zbioru testowego
    noisy_img, clean_img = test_noisy_data[0]
    noisy_img = noisy_img.unsqueeze(0).cuda()  # Dodanie wymiaru batcha
    clean_img = clean_img.numpy()

    # Przetwarzanie przez autoenkoder
    denoised_img = model(noisy_img).squeeze(0).cpu().numpy()

    # Wizualizacja
    plt.figure(figsize=(12, 4))

    plt.subplot(1, 3, 1)
    plt.title("Noisy Image")
    plt.imshow(noisy_img.squeeze().cpu().numpy(), cmap='gray')
    plt.axis('off')

    plt.subplot(1, 3, 2)
    plt.title("Original Image")
    plt.imshow(clean_img.squeeze(), cmap='gray')
    plt.axis('off')

    plt.subplot(1, 3, 3)
    plt.title("Denoised Image")
    plt.imshow(denoised_img.squeeze(), cmap='gray')
    plt.axis('off')

    plt.show()

