In [1]:
# created by Jiacheng Guo at Dec 4 15:22:58 CST 2021
# ResNet --jupyter version

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

ModuleNotFoundError: No module named 'torch'

In [None]:
class ResBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride = 1):
        super(ResBlock, self).__init__()
        self.block = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel,outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            #shortcut，这里为了跟2个卷积层的结果结构一致，要做处理
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )
        
    def forward(self, X):
        body = self.block(X)
#         print(body.shape, self.shortcut(X).shape)
        body = body + self.shortcut(X)
        body = F.relu(body)
        return body
        

class ResNet(nn.Module):
    def __init__(self, ResBlock, num_classes=4):
        super(ResNet, self).__init__()
        # img.shape = 1 here
        self.inchannel = 32
        
        self.conv = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )
        self.layer1 = self.make_layer(ResBlock, 32, 2, stride=1)
        self.layer2 = self.make_layer(ResBlock, 64, 2, stride=2)
        self.layer3 = self.make_layer(ResBlock, 128, 2, stride=2)
        self.layer4 = self.make_layer(ResBlock, 256, 2, stride=2)
        self.avgPool = nn.AvgPool2d(4)
        self.fc = nn.Linear(256, num_classes)
        
    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)
    
    def forward(self, X):
        out = self.conv(X)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.avgPool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [None]:
def unpickle(file):
    ## used to read binary files since our data files are in binary format
    import pickle
    with open(file, 'rb') as fo:
        dict = pickle.load(fo, encoding='bytes')
    return dict

def get_correct_and_accuracy(y_pred, y):
    # y_pred is the nxC prediction scores
    # give the number of correct and the accuracy
    n = y.shape[0]
    # find the prediction class label
    _ ,pred_class = y_pred.max(dim=1)
    correct = (pred_class == y).sum().item()
    return correct ,correct/n

## loading data from binary data files
batch_1_dictionary = unpickle('cifar-10-data/data_batch_1')
batch_2_dictionary = unpickle('cifar-10-data/data_batch_2')

## get training, validation and testing sets
X_train_all = np.array(batch_1_dictionary[b'data']).reshape(10000,3,32,32)  # 3072 = 3 channels x 32 width x 32 length
y_train_all = np.array(batch_1_dictionary[b'labels'])
validation_count = 1000
train_count = X_train_all.shape[0] - validation_count  # 9000
# print("y_train_all: ", y_train_all)
X_train = X_train_all[:train_count] # head 9000
y_train = y_train_all[:train_count]
X_val = X_train_all[train_count:]  # tail 1000
y_val = y_train_all[train_count:]
X_test = np.array(batch_2_dictionary[b'data']).reshape(10000,3,32,32) # convert test set into secondary matrix
y_test = np.array(batch_2_dictionary[b'labels'])


# for RGB data we can simply divide by 255
X_train_normalized = X_train / 255
X_val_normalized = X_val / 255
X_test_normalized = X_test / 255

In [None]:
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

n_iteration = 20
batch_size = 32
lr = 0.01

resNet = ResNet(ResBlock, num_classes=10)
print("model structure:", resNet)
#
optimizer = optim.Adam(resNet.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()
n_data = X_train_normalized.shape[0]
n_batch = int(np.ceil(n_data/batch_size))

# convert X_train and X_val to tensor
X_train_tensor = torch.tensor(X_train_normalized, dtype=torch.float32)
X_val_tensor = torch.tensor(X_val_normalized, dtype=torch.float32)

# convert training label to tensor and to type long
y_train_tensor = torch.tensor(y_train).long()
y_val_tensor = torch.tensor(y_val).long()

print('X train tensor shape:', X_train_tensor.shape)
print('n_batch: ', n_batch)

In [None]:
## start 
train_loss_list = np.zeros(n_iteration)
train_accu_list = np.zeros(n_iteration)
val_loss_list = np.zeros(n_iteration)
val_accu_list = np.zeros(n_iteration)

import time

for i in range(n_iteration):
    # first get a minibatch of data
    train_loss = []
    start_time = time.time()
    
    for j in range(n_batch):
#         print("\nbatch", j, ":")
#         start_time = time.time()
        
        batch_start_index = j*batch_size
        # get data batch from the normalized data
        X_batch = X_train_tensor[batch_start_index:batch_start_index+batch_size]
        # get ground truth label y
        y_batch = y_train_tensor[batch_start_index:batch_start_index+batch_size]

#         print(X_batch.shape)
        train_pred = resNet(X_batch)
#         print(train_pred.shape)
        train_crt, train_accu = get_correct_and_accuracy(train_pred, y_batch)
        train_loss_i = criterion(train_pred, y_batch)
        
#         train_loss.append(train_loss_i)
        train_loss.append(train_loss_i.detach().numpy())
        
        # Backpropagation
        optimizer.zero_grad()
        train_loss_i.backward()
        optimizer.step()
        
#         print("batch", j, ":\t", time.time() - start_time)

    # 
    val_pred = resNet(X_val_tensor)
    val_crt, val_accu = get_correct_and_accuracy(val_pred, y_val_tensor)
    val_loss = criterion(val_pred, y_val_tensor)
    
    ave_train_loss = np.sum(train_loss)/len(train_loss)
        
    print("Iter %d ,Train loss: %.3f, Train acc: %.3f, Val loss: %.3f, Val acc: %.3f" 
          %(i ,ave_train_loss, train_accu, val_loss, val_accu)) 
    ## add to the logs so that we can use them later for plotting
    train_loss_list[i] = ave_train_loss
    train_accu_list[i] = train_accu
    val_loss_list[i] = val_loss
    val_accu_list[i] = val_accu
    print("iteration", i, ":\t", time.time() - start_time)
    