This tutorial is based on PyTorch's tutorial: https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html and contains the code snippets from it:  
- device test
- transform, trainset, trainloader, testset, testloader, classes
- training, evaluating, and visualizing routines

The license of the original tutorial is the 3-Clause BSD License.  
See LICENSE for detail.


In [None]:
import torch
import torchvision
import torchvision.transforms as transforms
import os

import sys
sys.path.append('/content/drive/My Drive/Colab Notebooks/b3_proj_2022/MyModules')
import util


In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# To monitor the server's GPU installation and usage: log in the server and run `nvidia-smi`.
# It shows the list of GPUs online and their utilization.

# Assuming that we are on a CUDA machine, this should print a CUDA device:

print(device)




In [None]:
epochs = 15
batch_size_train = 128
batch_size_test = 128
num_shown_images = 8
input_size = 32
# input_size = 64

study_name = "exercise04_01_st01"

In [None]:
transform_train = transforms.Compose([
    torchvision.transforms.Resize(input_size),
    transforms.RandomCrop(input_size, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.424, 0.415, 0.384), (0.283, 0.278, 0.284))
])
transform_test = transforms.Compose([
    torchvision.transforms.Resize(input_size),
    transforms.ToTensor(),
    transforms.Normalize((0.424, 0.415, 0.384), (0.283, 0.278, 0.284))
])

trainset = torchvision.datasets.CIFAR10(root='/content/drive/My Drive/Colab Notebooks/b3_proj_2022/data', train=True,
                                        download=True, transform=transform_train)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size_train,
                                          shuffle=True, num_workers=2)

testset = torchvision.datasets.CIFAR10(root='/content/drive/My Drive/Colab Notebooks/b3_proj_2022/data', train=False,
                                       download=True, transform=transform_test)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size_test,
                                         shuffle=False, num_workers=2)

classes = ('plane', 'car', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck')
num_classes = len(classes)

In [None]:
import torch.nn as nn
from torch.autograd import Function

# A quantization function that emulates int8 (-128..127) in float expression
class Int8InFloatFunction(Function):
    @staticmethod
    def forward(ctx, input):
        ctx.save_for_backward(input)
        return torch.clamp(torch.round(input), -128, 127)
    
    @staticmethod
    def backward(ctx, grad_out):
        input = ctx.saved_tensors[0]
        return 
        # TRY!
        # Hint: To apply a conditional expression element-wise, torch.where() is useful! 
    
int8_in_float = Int8InFloatFunction.apply


# Its nn.Module wrapper with fraction part bit width
class Int8Float(nn.Module):
    pass
    # Implement refering to the manual!

In [None]:
import torch.nn as nn
from torch.autograd import Function

# A quantization/activation function that emulates binary sign (+1/-1) in float expression
class BinarySignFunction(Function):
    @staticmethod
    def forward(ctx, input):
        # Try!
    
    @staticmethod
    def backward(ctx, grad_out):
        # Try!
    
binary_sign = BinarySignFunction.apply


# Its nn.Module wrapper
class BinarySign(nn.Module):
    def __init__(self):
        super(BinarySign, self).__init__()
    
    def forward(self, input):
        return binary_sign(input)


In [None]:
import torch.nn as nn
import torch.nn.functional as F

# Conv2d subclass with quantizer option
class QuantizedConv2d(nn.Conv2d):
    def __init__(self, *args, **kwargs):
        quantizer = kwargs.pop("quantizer", None)
        super(QuantizedConv2d, self).__init__(*args, **kwargs)
        self.quantizer = quantizer
    
    def forward(self, input):
        # From official source https://pytorch.org/docs/stable/_modules/torch/nn/modules/conv.html
        if self.quantizer is not None:
            return self._conv_forward(input, self.quantizer(self.weight), self.bias)
        return self._conv_forward(input, self.weight, self.bias)

# Conv2d subclass with quantizer option
class QuantizedLinear(nn.Linear):
    def __init__(self, *args, **kwargs):
        # Try!
    
    def forward(self, input):
        # From official source https://pytorch.org/docs/stable/_modules/torch/nn/modules/linear.html
        # Try!
        

In [None]:
import torch.nn as nn
import torch.nn.functional as F
from collections import OrderedDict
import copy

# Conv層定義用ユーティリティ関数
def conv_block(ich, och, ksize, num_layers, *, bn=True, pool=False, act=None, quant=None, **kwargs):
    assert num_layers >= 1
    r = OrderedDict()
    i = 1
    for _ in range(num_layers):
        r["%02d-conv" % i] = QuantizedConv2d(ich, och, ksize, quantizer=copy.deepcopy(quant), **kwargs)
        if bn:
            r["%02d-conv-bn" % i] = nn.BatchNorm2d(och)
        ich = och  # set #input_channels to current #output_channels for next loop
        i += 1
    if pool:
        r["%02d-pool" % i] = nn.MaxPool2d(2, 2)
        i += 1
    if act is not None:
        r["%02d-act" % i] = copy.deepcopy(act)
    return r


# FC層定義用ユーティリティ関数
def fc_block(ich, och, *, bn=True, pool=False, act=None, quant=None, **kwargs):
    r = OrderedDict()
    r["01-fc"] = QuantizedLinear(ich, och, quantizer=copy.deepcopy(quant), **kwargs)
    if bn:
        r["01-fc-bn"] = nn.BatchNorm1d(och)
    if act is not None:
        r["02-act"] = copy.deepcopy(act)
    return r

# ネット定義
class Net(nn.Module):
    def __init__(self, num_classes=10):
        super(Net, self).__init__()
        
        num_conv_blocks = 4
        num_conv_layers_per_block = 2
        num_fc_layers = 2

        if num_fc_layers > 1:
            fc_hidden_size = 1024  

        act = BinarySign()
        # act = nn.ReLU()
        
        quant = Int8Float(5)
        # quant = None
        
        # Conv層を実体化
        conv_blocks = []
        conv_blocks.extend(
            [("conv1/%s" % k, v) for k, v in conv_block(3, 64, 3, 1, bn=True, pool=False, act=act, padding=1, quant=quant, bias=False).items()]
        )
        conv_blocks.extend(
            [("conv2/%s" % k, v) for k, v in conv_block(64, 128, 3, 1, bn=True, pool=True, act=act, padding=1, quant=quant, bias=False).items()]
        )
        
        # Conv層の実体化 … 上でもらったパラメータを使う
        ich = 128
        och_max = 512
        for i in range(3, num_conv_blocks + 1):
            och = min(ich * 2, och_max)
            conv_blocks.extend(
                [("conv%d/%s" % (i, k), v) for k, v in conv_block(ich, och, 3, num_conv_layers_per_block, bn=True, pool=True, act=act, padding=1, quant=quant, bias=False).items()]
            )
            ich = och
        
        # FC層の実体化
        fc_blocks = []
        ich = och * (input_size >> (num_conv_blocks - 1)) ** 2  # och is still in the scope after the previous FOR statement! 気持ち悪い!
        self.fc_input_size = ich
        i = 1
        for _ in range(1, num_fc_layers):
            fc_blocks.extend(
                [("fc%d/%s" % (i, k), v) for k, v in fc_block(ich, fc_hidden_size, bn=True, act=act, quant=quant, bias=False).items()]
            )
            i += 1
            ich = fc_hidden_size
        fc_blocks.extend(
            [("fc%d/%s" % (i, k), v) for k, v in fc_block(ich, num_classes, bn=False, act=None, quant=quant, bias=False).items()]  # 最終層だけactとochの扱いが違う
        )
        
        # モデル定義 … Sequentialの利用
        self.conv_blocks = nn.Sequential(OrderedDict(conv_blocks))
        self.fc_blocks = nn.Sequential(OrderedDict(fc_blocks))


    def forward(self, x):
        x = self.conv_blocks(x)
        x = x.view(-1, self.fc_input_size)
        x = self.fc_blocks(x)
        return x



In [None]:
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
import datetime

# 学習ルーチン
def train(net):
    date_str = datetime.datetime.now().strftime("%y%m%d-%H%M%S")
    basename = "%s-%s" % (study_name, date_str)
    
    print("Starting training for '%s'" % basename)
      
    lr = 0.000136789
    optimizer = optim.Adam(net.parameters(), lr=lr)

    lr_scheduler = optim.lr_scheduler.StepLR(optimizer, 5, gamma=0.1)

    dataiter = iter(trainloader)
    images, _ = dataiter.next()
    writer = SummaryWriter("/content/drive/My Drive/Colab Notebooks/b3_proj_2022/runs/%s" % basename)
    writer.add_graph(net, images.to(device))

    criterion = nn.CrossEntropyLoss()

    # Save initial state
    util.add_param(writer, net, 0)

    try:
        for epoch in range(epochs):  # loop over the dataset multiple times
            running_loss = 0.0
            net.train()
            for i, data in enumerate(trainloader, 0):
                # get the inputs; data is a list of [inputs, labels]
                inputs, labels = data[0].to(device), data[1].to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward + backward + optimize
                outputs = net(inputs)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                # print statistics
                running_loss += loss.item()
                if i % 100 == 99:
                    train_acc = util.accuracy_batch(outputs, labels)
                    print('[%d, %5d] loss: %.3f, train batch acc: %2d %%' %
                          (epoch + 1, i + 1, running_loss, train_acc))

                    gstep = epoch * len(trainloader) + i
                    writer.add_scalar('Training/Loss', running_loss, gstep)
                    writer.add_scalar('Training/Accuracy', train_acc, gstep)

                    running_loss = 0.0

            # Evaluate intermediate result
            gstep = epoch * len(trainloader) + i
            net.eval()
            with util.IntermediateOutputWriter(writer, net, gstep):
                test_acc = util.accuracy(testloader, net, device=device)
                print('[%d,      ] test acc: %2d %%' %
                      (epoch + 1, test_acc))
            writer.add_scalar('Test/Accuracy', test_acc, gstep)
            util.add_param(writer, net, gstep)

    finally:
        print('Finished Training')

        dirpath = 'saved_models'
        PATH = '%s/%s.pth' % (dirpath, basename)
        try:
            os.mkdir(dirpath)
        except FileExistsError:
            pass
        torch.save(net.state_dict(), PATH)
        print("Saved in %s." % PATH)
    return PATH

In [None]:
net = Net(num_classes)
net.to(device)
PATH = train(net)

In [None]:
# # 最適な成果を取得
# best_trial = study.best_trial

# その時のパラメータと学習済み係数でモデルを復元
# net = Net(optuna.trial.FixedTrial(best_trial.params))

net = Net(num_classes)
net.load_state_dict(torch.load(PATH))
# net.load_state_dict(torch.load(best_trial.user_attrs['saved_path']))
net.to(device)
net.eval()

dataiter = iter(testloader)
images, labels = dataiter.next()
images = images[:num_shown_images]
labels = labels[:num_shown_images]

# print images
util.imshow(torchvision.utils.make_grid(images))
print('GroundTruth: ', ' '.join('%5s' % classes[labels[j]] for j in range(num_shown_images)))

images = images.to(device)
labels = labels.to(device)

outputs = net(images)
_, predicted = torch.max(outputs, 1)

print('Predicted: ', ' '.join('%5s' % classes[predicted[j]]
                              for j in range(num_shown_images)))
accuracy_per_class, accuracy = util.accuracy_of_classes(num_classes, testloader, net, device=device)
print('Accuracy of the network on the 10000 test images: %d %%' % accuracy)

for i in range(10):
    print('Accuracy of %5s : %2d %%' % (classes[i], accuracy_per_class[i]))
