# Lenet Implementation

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader


def main():
    batch_size = 32
    cifar_train = datasets.CIFAR10('cifar', True, transform=transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor()
    ]), download=True)
    cifar_train = DataLoader(cifar_train, batch_size=batch_size, shuffle=True)

    cifar_test = datasets.CIFAR10('cifar', False, transform=transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor()
    ]), download=True)
    cifar_test = DataLoader(cifar_test, batch_size=batch_size, shuffle=True)

    x, label = iter(cifar_train).next()
    print("x:", x.shape, 'label:', label.shape)


if __name__ == '__main__':
    main()


In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch import nn
from torch.nn import functional as F


class Lenet5(nn.Module):
    """
    for cifar10 dataset
    """

    def __init__(self):
        super(Lenet5, self).__init__()

        self.conv_unit = nn.Sequential(
            # x: [b,3,32,32] ==> [b,6,x,x]
            nn.Conv2d(3, 6, kernel_size=5, stride=1, padding=0),
            nn.AvgPool2d(kernel_size=2, stride=2, padding=0),
            #
            nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0),
            nn.AvgPool2d(kernel_size=2, stride=2, padding=0),
        )
        # ==> [b,16,5,5]
        # to\do: flatten
        # fc_unit
        self.fc_unit = nn.Sequential(
            nn.Linear(16*5*5, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10)
        )

        # # [b,3,32,32]
        # tmp = torch.randn(2, 3, 32, 32)
        # out = self.conv_unit(tmp)
        # print("conv out: ", out.shape)

        # use Cross Entropy Loss
        # self.criteon = nn.CrossEntropyLoss()

    def forward(self, x):
        """
        : param x: [b,3,32,32]
        : return:
        """
        batchsz = x.size(0)
        # [b,3,32,32] ==> [b,16,5,5]
        x = self.conv_unit(x)
        # [b,16,5,5] ==> [b,16*5*5]
        x = x.view(batchsz, 16*5*5)
        # [b,16*5*5] ==> [b,10]
        logtis = self.fc_unit(x)

        # pred = F.softmax(logtis, dim=1)
        # loss = self.criteon(logtis, y)
        return logtis


def main():
    net = Lenet5()

    # [b,3,32,32]
    tmp = torch.randn(2, 3, 32, 32)
    out = net(tmp)
    print("lenet out: ", out.shape)


if __name__ == "__main__":
    main()


In [3]:
# train
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch import nn, optim
from torch.nn import functional as F


def main():
    batch_size = 32
    cifar_train = datasets.CIFAR10('cifar', True, transform=transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor()
    ]), download=True)
    cifar_train = DataLoader(cifar_train, batch_size=batch_size, shuffle=True)

    cifar_test = datasets.CIFAR10('cifar', False, transform=transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor()
    ]), download=True)
    cifar_test = DataLoader(cifar_test, batch_size=batch_size, shuffle=True)

    x, label = iter(cifar_train).next()
    print("x:", x.shape, 'label:', label.shape)

    device = torch.device('cuda')
    model = Lenet5().to(device=device)
    print(model)
    criteon = nn.CrossEntropyLoss().to(device=device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)
    for epoch in range(1000):
        model.train()
        for batchidx, (x, label) in enumerate(cifar_train):
            # x [b,3,32,32] ==>
            # label  [b]
            x, label = x.to(device), label.to(device)
            logits = model(x)
            # logits: [b,10]
            # label: [b]
            loss = criteon(logits, label)

            # backprop
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        #
        print(epoch, loss.item())

        model.eval()
        with torch.no_grad():
            # test
            for x, label in cifar_test:
                # [b,3,32,32]
                # [b]
                x, label = x.to(device), label.to(device)

                # [b,10]
                total_correct = 0
                total_num = 0
                logits = model(x)
                pred = logits.argmax(dim=1)
                # [b] vs [b] ==> scalar
                total_correct += torch.eq(pred, label).float().sum().item()
                total_num += x.size(0)

            acc = total_correct/total_num

            print(epoch, acc)


if __name__ == "__main__":
    main()

# the model are to be promoted


Files already downloaded and verified
Files already downloaded and verified
x: torch.Size([32, 3, 32, 32]) label: torch.Size([32])
Lenet5(
  (conv_unit): Sequential(
    (0): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
    (1): AvgPool2d(kernel_size=2, stride=2, padding=0)
    (2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (3): AvgPool2d(kernel_size=2, stride=2, padding=0)
  )
  (fc_unit): Sequential(
    (0): Linear(in_features=400, out_features=120, bias=True)
    (1): ReLU()
    (2): Linear(in_features=120, out_features=84, bias=True)
    (3): ReLU()
    (4): Linear(in_features=84, out_features=10, bias=True)
  )
)
0 2.184600591659546
0 0.25
1 2.1123101711273193
1 0.6875
2 1.0950274467468262
2 0.4375
3 1.6914961338043213
3 0.4375
4 1.7185277938842773
4 0.3125
5 1.5688220262527466
5 0.3125
6 1.5834250450134277
6 0.4375
7 1.991105556488037
7 0.5
8 1.3813546895980835
8 0.4375
9 1.62240469455719
9 0.4375
10 2.0080559253692627
10 0.625
11 1.1276460886001587
11 0.5
12 1.334

# ResNet Implementation

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from torch import nn, optim
from torch.nn import functional as F


class ResBlk(nn.Module):
    """
    resnet block
    """

    def __init__(self, in_ch, out_ch):
        """
        :param in_ch:
        :param out_ch
        """
        super(ResBlk, self).__init__()
        self.conv1 = nn.Conv2d(
            in_ch, out_ch, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.conv2 = nn.Conv2d(
            out_ch, out_ch, kernel_size=3, stride=1, padding=1)
        self.bn2 = nn.BatchNorm2d(out_ch)

        self.extra = nn.Sequential()
        if out_ch != in_ch:
            # [b,in_ch,h,w] ==> [b,out_ch,h,w]
            self.extra = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=1),
                nn.BatchNorm2d(out_ch)
            )

    def forward(self, x):
        """
        :param x: [b,ch,h,w]
        :return
        """
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        # short cut
        # element-wise add: [b,ch,h,w]
        out = self.extra(x) + out

        return out


class ResNet18(nn.Module):
    def __init__(self):
        super(ResNet18, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64)
        )
        # followed 4 blocks
        # [b,64,h,w] ==> [b,128,h,w]
        self.blk1 = ResBlk(64, 128)
        # [b,128,h,w] ==> [b,256,h,w]
        self.blk2 = ResBlk(128, 256)
        # [b,256,h,w] ==> [b,512,h,w]
        self.blk3 = ResBlk(256, 512)
        # [b,512,h,w] ==> [b,1024,h,w]
        self.blk4 = ResBlk(512, 1024)

    def forward(self, x):
        """
        :param x:
        :return:
        """
        x = F.relu(self.conv1(x))
