In [1]:
import torch
import torchvision
import torchvision.transforms as transforms
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.tensorboard import SummaryWriter

def imshow(img):
    img = img / 2 + 0.5 
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

In [2]:
#dataset
transform = transforms.Compose(
    [transforms.ToTensor(),
     transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)
testset = torchvision.datasets.CIFAR10(root='./data', train=False,
                                       download=True, transform=transform)

In [3]:
#Network
class Net(nn.Module):
    def __init__(self, n=3, dim_brance = 64, car = 1):
        super().__init__()
        self.n = n
        
        dim = dim_brance*car
        
        self.conv = nn.Sequential(nn.Conv2d(3, 64, (3,3), padding=1, bias=False),\
                    nn.BatchNorm2d(64))
    
        self.stage1_shortcut_projection = nn.Sequential(nn.Conv2d(64, 256, (1,1), bias=False),\
                    nn.BatchNorm2d(256)) #linear projection, same size
        self.stage1 = nn.ModuleList()
        for _ in range(n):
                tmp = []
                if _ == 0:
                    tmp.append(nn.Conv2d(64, dim, (1,1), bias=False))
                else:
                    tmp.append(nn.Conv2d(256, dim, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(dim))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim, dim, (3,3), padding=1, groups = car, bias=False))
                tmp.append(nn.BatchNorm2d(dim))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim, 256, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(256))
                self.stage1.append(nn.Sequential(*tmp))

        self.stage2_shortcut_projection = nn.Sequential(\
                    nn.Conv2d(256, 512, (1,1), stride=2, bias=False),\
                    nn.BatchNorm2d(512)) #linear projection, downsample
        self.stage2 = nn.ModuleList()
        for _ in range(n):
                tmp = []
                if _ == 0:
                    tmp.append(nn.Conv2d(256, dim*2, (1,1), stride=2, bias=False)) #size down
                else:
                    tmp.append(nn.Conv2d(512, dim*2, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(dim*2))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim*2, dim*2, (3,3), padding=1, groups = car, bias=False))
                tmp.append(nn.BatchNorm2d(dim*2))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim*2, 512, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(512))
                self.stage2.append(nn.Sequential(*tmp))
        
        self.stage3_shortcut_projection = nn.Sequential(\
                    nn.Conv2d(512, 1024, (1,1), stride=2, bias=False),\
                    nn.BatchNorm2d(1024)) #linear projection, downsample
        self.stage3 = nn.ModuleList()
        for _ in range(n):
                tmp = []
                if _ == 0:
                    tmp.append(nn.Conv2d(512, dim*4, (1,1), stride=2, bias=False))  #size down
                else:
                    tmp.append(nn.Conv2d(1024, dim*4, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(dim*4))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim*4, dim*4, (3,3), padding=1, groups = car, bias=False))
                tmp.append(nn.BatchNorm2d(dim*4))
                tmp.append(nn.ReLU(inplace=True))
                tmp.append(nn.Conv2d(dim*4, 1024, (1,1), bias=False))
                tmp.append(nn.BatchNorm2d(1024))
                self.stage3.append(nn.Sequential(*tmp))
        
        self.GAP = nn.AvgPool2d((8, 8))
        self.fc = nn.Linear(1024, 10)
    
    def forward(self, x):
        x = F.relu(self.conv(x), inplace=True)
        # stage_1
        for i in range(self.n):
            if i == 0:
                x_shortcut = self.stage1_shortcut_projection(x) #double channel, size same
                x = F.relu(self.stage1[i](x) + x_shortcut, inplace=True)
            else:
                x = F.relu(self.stage1[i](x) + x, inplace=True)
                
        # stage_2    
        for i in range(self.n):
            if i == 0:
                x_shortcut = self.stage2_shortcut_projection(x) #double channel, half same
                x = F.relu(self.stage2[i](x) + x_shortcut, inplace=True)
                
            else:
                x = F.relu(self.stage2[i](x) + x, inplace=True)
        # stage_3
        for i in range(self.n):
            if i == 0:
                x_shortcut = self.stage3_shortcut_projection(x) #double channel, half same
                x = F.relu(self.stage3[i](x) + x_shortcut, inplace=True)
            else:
                x = F.relu(self.stage3[i](x) + x, inplace=True)
        x = self.GAP(x) 
        x = torch.flatten(x, start_dim=1)
        x = self.fc(x)
        return x

In [4]:
#weight Initialization
def init_weights(m):
        if type(m) == nn.Linear:
            torch.nn.init.kaiming_normal_(m.weight)
            m.bias.data.fill_(0.01)
        if isinstance(m, nn.Conv2d):
            torch.nn.init.kaiming_normal_(m.weight.data)

In [5]:
#GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

#Batch size, Dataloader
batch_size = 128
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size,
                                          shuffle=True, num_workers=4)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size,
                                         shuffle=False, num_workers=4)

#Criterion
criterion = nn.CrossEntropyLoss()

In [6]:
#Make ResNet 1x64d
c = 1
d = 64

net = Net(dim_brance = d, car = c)
net.apply(init_weights)
net.to(device);

#Optimizer, Scheduler
#optimizer = optim.SGD(net.parameters(), lr=0.1, weight_decay = 0.0005, momentum=0.9)
optimizer = optim.SGD(net.parameters(), lr=0.05, weight_decay = 0.0005, momentum=0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)


#Train
end_epochs = 300
epoch = 0
it = 0

writer = SummaryWriter()
running_loss = 0.0
net.train();
while(epoch<end_epochs):
    epoch+=1
    for data in trainloader:
        it+=1
        inputs, labels = data[0].to(device), data[1].to(device)
        
        #random flip, constant padding, random crop
        if np.random.randint(0,2) == 0:
            inputs = torch.flip(inputs, [3])
        inputs = F.pad(inputs, (4, 4, 4, 4))
        W = inputs.size()[2]
        H = inputs.size()[3] 
        Ws = np.random.randint(0, W-32, 1)[0]
        Hs = np.random.randint(0, H-32, 1)[0]
        inputs = inputs[:,:,Ws:Ws+32,Hs:Hs+32]
        
        #training
        optimizer.zero_grad()
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        if it%100==0:
            print("epoch:",epoch,", loss/100it:",running_loss / 100)
            writer.add_scalar('training loss', running_loss / 100,it)
            running_loss = 0.0
            
    if epoch == 150 or epoch == 225:
        scheduler.step()      
    
    if epoch%5==0:
        print("epoch:",epoch,", test")
        net.eval();
        #for Training set
        correct = 0
        total = 0
        with torch.no_grad():
            for i,data in enumerate(trainloader):
                if i % 100 == 0:
                    print(i,"/",len(trainloader))
                inputs, labels = data[0].to(device), data[1].to(device)

                outputs = net(inputs)

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print('Accuracy on training images: %d %%' % (100 * correct / total))
        writer.add_scalar('Acc on training', (100 * correct / total),epoch)

        #for Test set
        correct = 0
        total = 0
        with torch.no_grad():
            for i,data in enumerate(testloader):
                if i % 50 == 0:
                    print(i,"/",len(testloader))
                inputs, labels = data[0].to(device), data[1].to(device)

                outputs = net(inputs)

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()
        print('Accuracy on test images: %d %%' % (100 * correct / total))
        writer.add_scalar('Acc on testing', (100 * correct / total),epoch)
        net.train();
    

    
print('Finished Training')
PATH = './CIFAR10_ResNet_'+str(c)+'x'+str(d)+'d_epoch'+str(epoch)+'.pth'
torch.save(net.state_dict(), PATH)
writer.close()