In [None]:
import torch.optim as optim
import os
import torch.nn.functional as F
import torch.nn as nn
import torch
import torchvision
import torchvision.transforms as transforms
import torchvision.transforms.functional as fn
from sklearn.metrics import f1_score, precision_score
import matplotlib.pyplot as plt
import numpy as np
# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = "cpu"
print(device)

In [None]:
transform32 = transforms.Compose(
    #  transforms.CenterCrop(10),
    [transforms.ToTensor(),
     #  transforms.RandomHorizontalFlip(p=0.9),
     #  transforms.RandomRotation(degrees=180),
     #  transforms.Grayscale(num_output_channels=1),
     #  transforms.GaussianBlur(kernel_size=501),
     transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
     ])
#  transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

transform16 = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Resize((16, 16)),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

transform8 = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Resize((8, 8)),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

batch_size = 64
num_workers = 12
shuffle = False

trainset32 = torchvision.datasets.CIFAR10(root='./data', train=True,
                                          download=True, transform=transform32)
trainset16 = torchvision.datasets.CIFAR10(root='./data', train=True,
                                          download=True, transform=transform16)

trainset8 = torchvision.datasets.CIFAR10(root='./data', train=True,
                                         download=True, transform=transform8)

testset32 = torchvision.datasets.CIFAR10(root='./data', train=False,
                                         download=True, transform=transform32)

testset16 = torchvision.datasets.CIFAR10(root='./data', train=False,
                                         download=True, transform=transform16)

testset8 = torchvision.datasets.CIFAR10(root='./data', train=False,
                                        download=True, transform=transform8)

trainloader32 = torch.utils.data.DataLoader(trainset32, batch_size=batch_size,
                                            shuffle=shuffle, num_workers=num_workers)

trainloader16 = torch.utils.data.DataLoader(trainset16, batch_size=batch_size,
                                            shuffle=shuffle, num_workers=num_workers)

trainloader8 = torch.utils.data.DataLoader(trainset8, batch_size=batch_size,
                                           shuffle=shuffle, num_workers=num_workers)

testloader32 = torch.utils.data.DataLoader(testset32, batch_size=batch_size,
                                           shuffle=shuffle, num_workers=num_workers)

testloader16 = torch.utils.data.DataLoader(testset16, batch_size=batch_size,
                                           shuffle=shuffle, num_workers=num_workers)

testloader8 = torch.utils.data.DataLoader(testset8, batch_size=batch_size,
                                          shuffle=shuffle, num_workers=num_workers)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

In [None]:


def draw_curve(current_epoch, optimizer_name, loss_name, x_epoch, y_loss):
    x_epoch.append(current_epoch)
    plt.plot(x_epoch, y_loss, 'bo-', label='train')
    # plt.plot(x_epoch, y_loss['val'], 'ro-', label='val')

    if current_epoch == 0:
        plt.legend()
    plt.savefig(os.path.join('./loss_graphs',
                f'train_{optimizer_name}_{loss_name}.jpg'))


def total_accuracy():
    correct = 0
    total = 0
    # since we're not training, we don't need to calculate the gradients for our outputs
    with torch.no_grad():
        for data in testloader32:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)
            # calculate outputs by running images through the network
            outputs = net(images)
            # the class with the highest energy is what we choose as prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(
        f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')


def class_accuracy():
    # prepare to count predictions for each class
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}
    # again no gradients needed
    with torch.no_grad():
        for data in testloader32:
            images, labels = data
            images = images.to(device)
            labels = labels.to(device)

            outputs = net(images)
            _, predictions = torch.max(outputs, 1)
            # collect the correct predictions for each class
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[classes[label]] += 1
                total_pred[classes[label]] += 1

    # print accuracy for each class
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')
# %%


class Net(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 3)
        self.conv2 = nn.Conv2d(32, 32, 3)
        self.conv3 = nn.Conv2d(32, 32, 3)
        self.conv3_bn = nn.BatchNorm2d(32)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout1 = nn.Dropout(0.25)

        self.conv4 = nn.Conv2d(32, 64, 3)
        self.conv5 = nn.Conv2d(64, 64, 3)
        self.conv6 = nn.Conv2d(64, 64, 3)
        self.pool2 = nn.MaxPool2d(2, 2)
        self.dropout2 = nn.Dropout(0.25)
        self.fc1 = nn.Linear(576, 512)
        # self.fc2 = nn.Linear(512, 256)
        self.dropout3 = nn.Dropout(0.5)
        self.fc3 = nn.Linear(512, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = self.conv3_bn(x)

        x = self.pool(x)
        x = self.dropout1(x)
        # print("shape x", x.shape)p
        x = F.relu(self.conv4(x))
        x = F.relu(self.conv5(x))
        x = F.relu(self.conv6(x))
        x = self.pool2(x)
        x = self.dropout2(x)
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        # print(x.size())
        x = F.relu(self.fc1(x))
        x = self.dropout3(x)
        # x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x = F.log_softmax(x, dim=1)
        return x


net = Net().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [None]:


def train(trainloader, trainset, epochs=20):
    x_epoch = []
    y_loss = []
    for epoch in range(epochs):  # loop over the dataset multiple times

        epoch_loss = 0
        running_loss = 0.0
        for i, data in enumerate(trainloader, 0):
            # get the inputs; data is a list of [inputs, labels]
            inputs, labels = data
            inputs = inputs.to(device)
            inputs = fn.resize(inputs, [32, 32])
            labels = labels.to(device)
            now_batch_size = labels.size()[0]
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # print statistics
            epoch_loss += loss.item() * now_batch_size
            running_loss += loss.item()
            if i % 200 == 199:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0
        epoch_loss = running_loss / len(trainset)
        print(f"Epoch {epoch} loss: {epoch_loss}")
        y_loss.append(epoch_loss)
        draw_curve(epoch, optimizer_name=optimizer.__class__.__name__,
                   loss_name=criterion.__class__.__name__, x_epoch=x_epoch, y_loss=y_loss)


In [None]:
train(trainloader16, trainset16, 40)
print('Finished Training')
# %%
PATH = './cifar_net_train_16.pth'
torch.save(net.state_dict(), PATH)

In [None]:
class_accuracy()
total_accuracy()
# %%
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data in testloader32:
        images, labels = data

        images = images.to(device)
        labels = labels.to(device)
        # calculate outputs by running images through the network
        outputs = net(images)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(
    f'Accuracy of the network on the 10000 test images: {100 * correct // total} %')

In [None]:

images, labels = next(iter(testloader16))
# %%
print(images.shape)
fn.resize(images, [32, 32]).shape