In [2]:
# Imports here
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sb
import cv2
from tqdm import tqdm
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models
from torchviz import make_dot, make_dot_from_trace
from torch.utils.data import DataLoader, Dataset
import os
import torch.nn.functional as F
from torchsummary import summary
from torchvision.datasets import CIFAR10
from torchvision.transforms import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam
from torch.autograd import Variable
import matplotlib.pyplot as plt
import torchvision
import time

In [3]:
# Loading and normalizing the data.
# Define transformations for the training and test sets
transformations = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# CIFAR10 dataset consists of 50K training images. We define the batch size of 10 to load 5,000 batches of images.
batch_size = 10
number_of_labels = 10 

# Create an instance for training. 
# When we run this code for the first time, the CIFAR10 train dataset will be downloaded locally. 
train_set =CIFAR10(root="./data",train=True,transform=transformations,download=True)

# Create a loader for the training set which will read the data within batch size and put into memory.
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
print("The number of images in a training set is: ", len(train_loader)*batch_size)

# Create an instance for testing, note that train is set to False.
# When we run this code for the first time, the CIFAR10 test dataset will be downloaded locally. 
test_set = CIFAR10(root="./data", train=False, transform=transformations, download=True)

# Create a loader for the test set which will read the data within batch size and put into memory. 
# Note that each shuffle is set to false for the test loader.
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)
print("The number of images in a test set is: ", len(test_loader)*batch_size)

print("The number of batches per epoch is: ", len(train_loader))
classes = ('plane', 'car', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck')

Files already downloaded and verified
The number of images in a training set is:  50000
Files already downloaded and verified
The number of images in a test set is:  10000
The number of batches per epoch is:  5000


In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# Assuming that we are on a CUDA machine, this should print a CUDA device:
print(device)

cuda:0


In [None]:

# functions to show an image

def imshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# get some random training images
dataiter = iter(train_loader)
images, labels = next(dataiter)
print(images.shape)
# show images
imshow(torchvision.utils.make_grid(images))
# print labels
print(' '.join(f'{labels[j]}' for j in range(8)))

In [None]:
import torchvision.transforms as T
import torchvision.models as models
from torchvision.utils import make_grid
from torchvision.datasets import ImageFolder
for images, labels in train_loader:
    fig, ax = plt.subplots(figsize = (10, 10))
    ax.set_xticks([])
    ax.set_yticks([])
    ax.imshow(make_grid(images, 4).permute(1,2,0))
    break

In [5]:
class ConvBlock(nn.Module):
    
    def __init__(self, input_dim, block_dim, dowmSample = False):
        super(ConvBlock, self).__init__()
        self.input_dim = input_dim
        self.output_dim = block_dim*4
        self.downSample = dowmSample
        self.block = nn.Sequential(
            nn.Conv2d(input_dim,block_dim,1,bias=False),
            nn.BatchNorm2d(block_dim),
            nn.ReLU(),
            nn.Conv2d(block_dim,block_dim,3,padding=1,bias=False),
            nn.BatchNorm2d(block_dim),
            nn.ReLU(),
            nn.Conv2d(block_dim,self.output_dim,1,bias=False),
            nn.BatchNorm2d(self.output_dim),
            nn.ReLU()
        )
        self.downsample = nn.Sequential(
            nn.Conv2d(input_dim, self.output_dim, 1,bias=False),
            nn.BatchNorm2d(self.output_dim)
        )
        
    def forward(self, x):
        tmp = self.block(x)
        if self.input_dim != self.output_dim:
            res = self.downsample(x)
        else:
            res = x
        tmp = tmp + res
        return tmp


In [6]:
class ResNet50(nn.Module):
    
    def __init__(self):
        super(ResNet50, self).__init__()
        self.foot = nn.Sequential(
            nn.Conv2d(3,64,7,stride=2, padding = 3),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(3,stride=2)
        )
        self.block1 = nn.Sequential(
            ConvBlock(64,64,True),  #(64,64) => (64,64) => (64,256)
            ConvBlock(256,64), #(256,64) => (64,64) => (64,256) 
            ConvBlock(256,64)  #(256,64) => (64,64) => (64,256)
        )
        self.block2 = nn.Sequential(
            ConvBlock(256,128,True), #(256,128) => (128,128) => (128,512)
            ConvBlock(512,128), #(512,128) => (128,128) => (128,512)
            ConvBlock(512,128), #(512,128) => (128,128) => (128,512)
            ConvBlock(512,128), #(512,128) => (128,128) => (128,512)
            nn.MaxPool2d(2,2)
        )
        self.block3 = nn.Sequential(
            ConvBlock(512,256,True), #(512,256) => (256,256) => (256,1024)
            ConvBlock(1024,256),#(1024,256) => (256,256) => (256,1024)
            ConvBlock(1024,256),#(1024,256) => (256,256) => (256,1024)
            ConvBlock(1024,256),#(1024,256) => (256,256) => (256,1024)
            ConvBlock(1024,256),#(1024,256) => (256,256) => (256,1024)
            ConvBlock(1024,256),#(1024,256) => (256,256) => (256,1024)
            nn.MaxPool2d(2,2)
        )
        self.block4 = nn.Sequential(
            ConvBlock(1024,512,True),#(1024,512) => (512,512) => (512,2048)
            ConvBlock(2048,512),#(2048,512) => (512,512) => (512,2048)
            ConvBlock(2048,512),#(2048,512) => (512,512) => (512,2048)
            nn.AdaptiveAvgPool2d((1,1))
        )
        self.fc = nn.Sequential(
            nn.Linear(2048,128),
            nn.ReLU(),
            nn.Linear(128,10)
        )

        
    def forward(self, x):
        tmp = self.foot(x)
        tmp = self.block1(tmp)
        tmp = self.block2(tmp)
        tmp = self.block3(tmp)
        tmp = self.block4(tmp)
        tmp = torch.flatten(tmp,start_dim=1)
        tmp = self.fc(tmp)

        return tmp

In [7]:
model = ResNet50()
# Define your execution device
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("The model will be running on", device, "device")
# Convert model parameters and buffers to CPU or Cuda
model.to(device)

The model will be running on cuda:0 device


ResNet50(
  (foot): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3))
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (block1): Sequential(
    (0): ConvBlock(
      (block): Sequential(
        (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (2): ReLU()
        (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (5): ReLU()
        (6): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (8): ReLU()
      )
      (downsample): Sequential(
        

In [8]:
My_model_summary = ResNet50()
summary(My_model_summary,(3,32,32),device = "cpu")

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 64, 16, 16]           9,472
       BatchNorm2d-2           [-1, 64, 16, 16]             128
              ReLU-3           [-1, 64, 16, 16]               0
         MaxPool2d-4             [-1, 64, 7, 7]               0
            Conv2d-5             [-1, 64, 7, 7]           4,096
       BatchNorm2d-6             [-1, 64, 7, 7]             128
              ReLU-7             [-1, 64, 7, 7]               0
            Conv2d-8             [-1, 64, 7, 7]          36,864
       BatchNorm2d-9             [-1, 64, 7, 7]             128
             ReLU-10             [-1, 64, 7, 7]               0
           Conv2d-11            [-1, 256, 7, 7]          16,384
      BatchNorm2d-12            [-1, 256, 7, 7]             512
             ReLU-13            [-1, 256, 7, 7]               0
           Conv2d-14            [-1, 25

In [9]:
# Function to show the images
def imageshow(img):
    img = img / 2 + 0.5     # unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()


# Function to test the model with a batch of images and show the labels predictions
def testBatch():
    # get batch of images from the test DataLoader  
    images, labels = next(iter(test_loader))

    # show all images as one image grid
    imageshow(torchvision.utils.make_grid(images))
   
    # Show the real labels on the screen 
    print('Real labels: ', ' '.join('%5s' % classes[labels[j]] 
                               for j in range(batch_size)))
    outputs = model(images)
    
    _, predicted = torch.max(outputs, 1)
    
    print('Predicted: ', ' '.join('%5s' % classes[predicted[j]] 
                              for j in range(batch_size)))

In [10]:
# Function to save the model
def saveModel():
    path = "./ResNet101.pth"
    torch.save(model.state_dict(), path)

# Function to test the model with the test dataset and print the accuracy for the test images
def testAccuracy():
    model.eval()
    accuracy = 0.0
    total = 0.0
    
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()
    
    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)

In [11]:
if __name__ == "__main__":
    start = time.time()
    num_epochs = 1
    best_accuracy = 0.0
    loss_fn = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.001, weight_decay=0.0001)
    for epoch in range(num_epochs):  # loop over the dataset multiple times
        running_loss = 0.0
        running_acc = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):
            
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))
            
            #將梯度初始化为零
            optimizer.zero_grad()
            # 向前傳播求出預測的值
            outputs = model(images)
            # 用output和label計算loss
            loss = loss_fn(outputs, labels)
            # 反向傳播
            loss.backward()
            # 更新參數
            optimizer.step()

            # Let's print statistics for every 1,000 images
            running_loss += loss.item()     # extract the loss value
            if i % 1000 == 999:    
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 1000))
                # zero the loss
                running_loss = 0.0

        # Compute and print the average accuracy fo this epoch when tested over all 10000 test images
        accuracy = testAccuracy()
        print('For epoch', epoch+1,'the test accuracy over the whole test set is %d %%' % (accuracy))
        
        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            path = "./ResNet101_weight.pth"
            torch.save(model.state_dict(), path)
            best_accuracy = accuracy
        end = time.time()
        print(f"time : {end-start}")

[1,  1000] loss: 2.028
[1,  2000] loss: 1.903
[1,  3000] loss: 1.818
[1,  4000] loss: 1.735
[1,  5000] loss: 1.650
For epoch 1 the test accuracy over the whole test set is 40 %
time : 145.9912006855011
