W ćwiczeniu 6 należy zbudować sieć wykorzystującą warstwy CNN i operator max
pooling. Warstwa CNN dostępna jest w torch.nn. Conv2d:
https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html
Warstwa przyjmuje 4-wymiarowe wejścia (batch, kanały obrazka, wysokość,
szerokość). Dla obrazków czarno-białych liczba kanałów wejścia będzie równa 1,
liczba kanałów wyjściowych powinna być większa. Po przejściu przez jedną lub więcej
warstw konwolucyjnych, wyjście trzeba spłaszczyć i podać do warstwy liniowej, dalej
skorzystać ze standardowej dla klasyfikacji funkcji kosztu.
Do implementacji warstwy liniowej można skorzystać z LazyLinear, która
automatycznie ustala wymiarowość przy pierwszym przejściu danych.
Do przebadania będzie
• Liczba kanałów wyjściowych warstwy konwolucyjnej
• Rozmiar filtra warstwy konwolucyjnej
• Rozmiar okna poolingu
• Zaburzenia danych: dane można zaburzyć dodając do wejściowego batcha
batch o tych samych wymiarach, wygenerowany jako szum gaussowski o
różnych odchyleniach. Przebadać scenariusze: szum dodany w danych
testowych vs szum dodany zarówno w testowych, jak i treningowych.
Ćwiczenie oceniane jest w skali 0-10 pkt, na jego wykonanie są 2 tygodnie.

In [None]:
import time
from torchvision import transforms, datasets
import torch
from torch.utils.data import Subset
from torch import nn
from torch import optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.init as init
import random
from sklearn.metrics import precision_score, accuracy_score, f1_score
from torchvision.transforms import GaussianBlur
from torch.autograd import Variable
from torch.nn import LazyLinear

default_batch_size = 100

seed = 42

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)


# Define a transform to normalize the data
transform = transforms.Compose([transforms.ToTensor(),
                                 transforms.Normalize((0.5,), (0.5,))])

transform_blur = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),
    GaussianBlur(kernel_size=9, sigma=(1.5, 5.0))
])

#how to get train and test data
train_data = datasets.FashionMNIST('path', download=True, train=True, transform=transform)
test_data = datasets.FashionMNIST('path', download=True, train=False, transform=transform)

# use only 5% of train data
train_indices = list(range(len(train_data)))
random.shuffle(train_indices)
train_indices_5_percent = train_indices[:len(train_indices)//20]

test_indices = list(range(len(test_data)))
random.shuffle(test_indices)
test_indices_5_percent = test_indices[:len(test_indices)//20]
# dont use subset for now
train_data_5_percent = Subset(train_data, train_indices_5_percent)
test_data_5_percent = Subset(test_data, test_indices_5_percent)

print('100%: ',len(train_data))
print('100%: ',len(test_data))
print('5%: ',len(train_data_5_percent))
print('5%: ',len(test_data_5_percent))
# print out rest of configuration



train_loader = torch.utils.data.DataLoader(train_data_5_percent, batch_size=default_batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data_5_percent, batch_size=default_batch_size, shuffle=True)

#shoow first input shape    
print(next(iter(train_loader))[0].shape)





def train(model, criterion, data_loader, test_loader, epochs):
    #reset model
    model.apply(init_normal)
    optimizer = optim.Adam(model.parameters())
    torch.manual_seed(seed)

    start_timestamp = time.time()
    training_loss = []
    test_loss_list = []

    for epoch in range(epochs):
        running_loss = 0
        test_loss = 0
        with torch.no_grad():
            for images, labels in test_loader:
                images = Variable(images.view(-1, 1, 28, 28))
                labels = Variable(labels)
                #forward pass
                logits = model(images)
                loss_test = criterion(logits, labels)
                test_loss += loss_test.item()

        for images, labels in data_loader:
            images = Variable(images.view(-1, 1, 28, 28))
            labels = Variable(labels)
            logits = model(images)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        running_loss = running_loss/len(data_loader)
        test_loss = test_loss/len(test_loader)
        training_loss.append(running_loss) 
        test_loss_list.append(test_loss)

        if (epoch) % 10 == 0:
            print(f"Epoch {epoch}/{epochs} - Train Loss: {running_loss:.4f}, Test Loss: {test_loss:.4f}")

    print(f"\nTraining Time (in seconds) = {(time.time()-start_timestamp):.2f}")
    return training_loss, test_loss_list

def plot_loss(losses, title):
    train, test = losses
    plt.plot(train)
    plt.plot(test)
    plt.legend(['Train' ,'Test'])
    plt.title(title)
    plt.ylabel('Loss')
    plt.xlabel('Epoch')
    plt.ylim(0, 3)
    plt.show()




class CustomCNN(nn.Module):
    def __init__(self, num_channels, kernel_size, pool_size):
        super(CustomCNN, self).__init__()

        # Convolutional layer with adjustable parameters
        self.conv_block = nn.Sequential(
            nn.Conv2d(1, num_channels, kernel_size=kernel_size, stride=1, padding=1),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=pool_size, stride=2)
        )

        # Flatten and fully connected layers
        self.fc_block = nn.Sequential(
            nn.Flatten(),
            LazyLinear(num_channels * 14 * 14),
            nn.LeakyReLU(),
            LazyLinear(128),
            nn.LeakyReLU(),
            LazyLinear(10),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv_block(x)
        x = self.fc_block(x)
        return x



def init_normal(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        torch.manual_seed(seed)
        if hasattr(m, 'weight'):
            nn.init.xavier_normal_(m.weight)
        if hasattr(m, 'bias'):
            nn.init.constant_(m.bias, 0)


# Define the loss
criterion = nn.CrossEntropyLoss();

epochs = 100

def evaluate_model(model, test_loader):
    model.eval()
    total = 0
    correct = 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.view(-1, 1, 28, 28)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_labels.extend(labels.numpy())
            all_predictions.extend(predicted.numpy())

    accuracy = correct / total
    precision = precision_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'F1 Score: {f1:.4f}')





In [None]:
import time
from torchvision import transforms, datasets
import torch
from torch.utils.data import Subset
from torch import nn
from torch import optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
import torch.nn.init as init
import random
from sklearn.metrics import precision_score, accuracy_score, f1_score
from torchvision.transforms import GaussianBlur
from torch.autograd import Variable
from torch.nn import LazyLinear

default_batch_size = 200

seed = 42

random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)


# Define a transform to normalize the data
transform = transforms.Compose([transforms.ToTensor(),
                                 transforms.Normalize((0.5,), (0.5,))])

transform_blur = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,)),
    GaussianBlur(kernel_size=9, sigma=(1.5, 5.0))
])

#how to get train and test data
train_data = datasets.FashionMNIST('path', download=True, train=True, transform=transform)
test_data = datasets.FashionMNIST('path', download=True, train=False, transform=transform)

#train_data with blur
train_data_noise = datasets.FashionMNIST('path', download=True, train=True, transform=transform_blur)
test_data_noise = datasets.FashionMNIST('path', download=True, train=False, transform=transform_blur)

# use only 10% of train data
train_indices = list(range(len(train_data)))
random.shuffle(train_indices)
train_indices_10_percent = train_indices[:len(train_indices)//10]

test_indices = list(range(len(test_data)))
random.shuffle(test_indices)
test_indices_10_percent = test_indices[:len(test_indices)//10]
# dont use subset for now
train_data_10_percent = Subset(train_data, train_indices_10_percent)
test_data_10_percent = Subset(test_data, test_indices_10_percent)

train_data_10_percent_noisy = Subset(train_data_noise, train_indices_10_percent)
test_data_10_percent_noisy = Subset(test_data_noise, test_indices_10_percent)

print('100%: ',len(train_data))
print('100%: ',len(test_data))
print('10%: ',len(train_data_10_percent))
print('10%: ',len(test_data_10_percent))
# print out rest of configuration


train_loader = torch.utils.data.DataLoader(train_data, batch_size=default_batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=default_batch_size, shuffle=True)

train_loader_noisy = torch.utils.data.DataLoader(train_data_10_percent_noisy, batch_size=default_batch_size, shuffle=True)
test_loader_noisy = torch.utils.data.DataLoader(test_data_10_percent_noisy, batch_size=default_batch_size, shuffle=True)

#shoow first input shape
print(next(iter(train_loader))[0].shape)


def plot_loss(losses, title):
    train, test = losses
    plt.plot(train)
    plt.plot(test)
    plt.legend(['Train' ,'Test'])
    plt.title(title)
    plt.ylabel('Loss')
    plt.xlabel('Batch')
    plt.ylim(0, 3)
    plt.show()

#every batch take batch loss
def train(model, criterion, data_loader, test_loader, epochs):
    # Reset model
    model.apply(init_normal)
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
    torch.manual_seed(seed)

    start_timestamp = time.time()
    training_loss = []
    test_loss_list = []

    for epoch in range(epochs):
        running_loss = 0
        test_loss = 0

        # Training phase
        model.train()
        for batch_idx, (images, labels) in enumerate(data_loader):
            images = Variable(images.view(-1, 1, 28, 28))
            labels = Variable(labels)
            logits = model(images)
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss = loss.item()

            # Print training loss for every batch
            # print(f"Epoch {epoch}/{epochs} - Batch {batch_idx}/{len(data_loader)} - Train Loss: {loss.item():.4f}")

            training_loss.append(running_loss)

            # Testing phase
            model.eval()
            with torch.no_grad():
                for batch_idx, (images, labels) in enumerate(test_loader):
                    images = Variable(images.view(-1, 1, 28, 28))
                    labels = Variable(labels)
                    logits = model(images)
                    loss_test = criterion(logits, labels)
                    test_loss = loss_test.item()

                # Print test loss for every batch
                # print(f"Epoch {epoch}/{epochs} - Batch {batch_idx}/{len(test_loader)} - Test Loss: {loss_test.item():.4f}")

            test_loss_list.append(test_loss)

        print(f"Epoch {epoch}/{epochs} - Total Train Loss: {running_loss:.4f}, Total Test Loss: {test_loss:.4f}")

    print(f"\nTraining Time (in seconds) = {(time.time()-start_timestamp):.2f}")
    return training_loss, test_loss_list


class CustomCNN(nn.Module):
    def __init__(self, num_channels, kernel_size, pool_size):
        super(CustomCNN, self).__init__()

        self.conv_block = nn.Sequential(
            nn.Conv2d(1, num_channels, kernel_size=kernel_size),
            nn.LeakyReLU(),
            nn.MaxPool2d(kernel_size=pool_size),
        )

        self.fc_block = nn.Sequential(
            nn.Flatten(),
            LazyLinear(120),
            nn.Dropout2d(0.5),
            nn.LeakyReLU(),
            LazyLinear(10),
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = self.conv_block(x)
        x = self.fc_block(x)
        return x



def init_normal(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        torch.manual_seed(seed)
        if hasattr(m, 'weight'):
            nn.init.xavier_normal_(m.weight)
        if hasattr(m, 'bias'):
            nn.init.constant_(m.bias, 0)


# Define the loss
criterion = nn.CrossEntropyLoss();

epochs = 5

def evaluate_model(model, test_loader):
    model.eval()
    total = 0
    correct = 0
    all_labels = []
    all_predictions = []

    with torch.no_grad():
        for images, labels in test_loader:
            images = images.view(-1, 1, 28, 28)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            all_labels.extend(labels.numpy())
            all_predictions.extend(predicted.numpy())

    accuracy = correct / total
    precision = precision_score(all_labels, all_predictions, average='weighted')
    f1 = f1_score(all_labels, all_predictions, average='weighted')

    print(f'Accuracy: {accuracy:.4f}')
    print(f'Precision: {precision:.4f}')
    print(f'F1 Score: {f1:.4f}')





In [None]:
num_channels = 16  # You can adjust this value
kernel_size = 3    # You can adjust this value
pool_size = 2      # You can adjust this value
#pass a dummy tensor to the model
#Each value is the darkness of the pixel (1 to 255)
dummy_tensor = torch.rand(1, 1, 28, 28)



cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)

cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader,
    test_loader,
    epochs=epochs)

evaluate_model(cnn_model, test_loader)
plot_loss(cnn_losses, f'cnn_model num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')

#try changinf channel size
num_channels = 32  # You can adjust this value
kernel_size = 3    # You can adjust this value
pool_size = 2      # You can adjust this value

cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)
cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader,
    test_loader,
    epochs=epochs)

evaluate_model(cnn_model, test_loader)
plot_loss(cnn_losses,  f'cnn_model num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')

#change filter size

num_channels = 16  # You can adjust this value
kernel_size = 9   # You can adjust this value
pool_size = 2      # You can adjust this value

cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)
cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader,
    test_loader,
    epochs=epochs)

evaluate_model(cnn_model, test_loader)
plot_loss(cnn_losses,  f'cnn_model num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')

# change pool size

num_channels = 16  # You can adjust this value
kernel_size = 3    # You can adjust this value
pool_size = 4      # You can adjust this value

cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)
cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader,
    test_loader,
    epochs=epochs)

evaluate_model(cnn_model, test_loader)

plot_loss(cnn_losses,  f'cnn_model num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')

num_channels = 16  # You can adjust this value
kernel_size = 3    # You can adjust this value
pool_size = 2      # You can adjust this value

cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)
cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader,
    test_loader_noisy,
    epochs=epochs)

evaluate_model(cnn_model, test_loader_noisy)

plot_loss(cnn_losses,  f'noisy_test num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')


# change data train loader to noisy

num_channels = 16  # You can adjust this value
kernel_size = 3    # You can adjust this value
pool_size = 2      # You can adjust this value

cnn_model = CustomCNN(num_channels, kernel_size, pool_size)
cnn_model(dummy_tensor)
cnn_model.apply(init_normal)

print(cnn_model)

# train model
cnn_losses = train(
    cnn_model,
    criterion,
    train_loader_noisy,
    test_loader,
    epochs=epochs)

evaluate_model(cnn_model, test_loader)

plot_loss(cnn_losses,  f'noisy_all num_channels = {num_channels} kernel_size = {kernel_size} pool_size = {pool_size}')