In [22]:
import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
import torch.optim.lr_scheduler as lr_scheduler
from torch.utils.data import DataLoader
from torch.utils.data import sampler
import torchvision.datasets as dset
import torchvision.transforms as T
import torch.nn.functional as F
import copy
import time

In [14]:
class ResNet(nn.Module):
    def __init__(self, ResidualBlock, num_classes=10):
        super(ResNet, self).__init__()
        self.inchannel = 64
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(64),
            nn.ReLU(),
        )
        self.layer1 = self.make_layer(ResidualBlock, 64,  2, stride=1)
        self.layer2 = self.make_layer(ResidualBlock, 128, 2, stride=2)
        self.layer3 = self.make_layer(ResidualBlock, 256, 2, stride=2)
        self.layer4 = self.make_layer(ResidualBlock, 512, 2, stride=2)
        self.fc = nn.Linear(512, num_classes)

    def make_layer(self, block, channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)   #strides=[1,1]
        layers = []
        for stride in strides:
            layers.append(block(self.inchannel, channels, stride))
            self.inchannel = channels
        return nn.Sequential(*layers)

    def forward(self, x):
        out = self.conv1(x)
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.fc(out)
        return out

In [15]:
class ResidualBlock(nn.Module):
    def __init__(self, inchannel, outchannel, stride=1):
        super(ResidualBlock, self).__init__()
        self.left = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=3, stride=stride, padding=1, bias=False),
            nn.BatchNorm2d(outchannel),
            nn.ReLU(inplace=True),
            nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=1, padding=1, bias=False),
            nn.BatchNorm2d(outchannel)
        )
        self.shortcut = nn.Sequential()
        if stride != 1 or inchannel != outchannel:
            self.shortcut = nn.Sequential(
                nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(outchannel)
            )

    def forward(self, x):
        out = self.left(x)
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [16]:
def ResNet18():
    return ResNet(ResidualBlock)

# 从训练集的50000个样本中，取49000个作为训练集，剩余1000个作为验证集
NUM_TRAIN = 49000

# 数据预处理，减去cifar-10数据均值
transform_normal = T.Compose([
    T.ToTensor(),
    T.Normalize((0.4914, 0.4822, 0.4465),(0.2023, 0.1994, 0.2010))
])
# 数据增强
transform_aug = T.Compose([
    T.RandomCrop(32, padding=4),
    T.RandomHorizontalFlip(),
    T.ToTensor(),
    T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
])

In [17]:
# GPU
USE_GPU = True
dtype = torch.float32
print_every = 100

if USE_GPU and torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print('using device:', device)

using device: cuda


In [18]:
# 验证模型在验证集或者测试集上的准确率
def check_accuracy(loader, model):
    if loader.dataset.train:
        header = 'Accuracy on validation set:'
    else:
        header = 'Accuracy on test set:'
    num_correct = 0
    num_samples = 0
    model.eval()   # set model to evaluation mode
    with torch.no_grad():
        for x,y in loader:
            x = x.to(device=device, dtype=dtype)
            y = y.to(device=device, dtype=torch.long)
            scores = model(x)
            _,preds = scores.max(1)
            num_correct += (preds==y).sum()
            num_samples += preds.size(0)
        acc = float(num_correct) / num_samples
        print('---------%s, Got %d / %d correct (%.2f)' % (header, num_correct, num_samples, 100 *acc ))
        return acc

In [24]:
def train_model(model, optimizer, epochs=1, scheduler=None):
    '''
    Parameters:
    - model: A Pytorch Module giving the model to train.
    - optimizer: An optimizer object we will use to train the model
    - epochs: A Python integer giving the number of epochs to train
    Returns: best model
    '''
    best_model_wts = None
    best_acc = 0.0
    model = model.to(device=device) # move the model parameters to CPU/GPU
    for e in range(epochs):
        if scheduler:
            scheduler.step()
        start_time = time.time()
        for t,(x,y) in enumerate(loader_train):
            model.train()   # set model to training mode
            x = x.to(device, dtype=dtype)
            y = y.to(device, dtype=torch.long)

            scores = model(x)
            loss = F.cross_entropy(scores, y)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print('Epoch %d, loss=%.4f, time=%.4f' % (e, loss.item(), time.time()-start_time))
        acc = check_accuracy(loader_val, model)
        if acc > best_acc:
            best_model_wts = copy.deepcopy(model.state_dict())
            best_acc = acc
    print('best_acc:',best_acc)
    model.load_state_dict(best_model_wts)
    return model

In [25]:
# Load Data

# Train Data
cifar10_train = dset.CIFAR10('./data', train=True, download=True, transform=transform_aug)
loader_train = DataLoader(cifar10_train, batch_size=64, sampler=sampler.SubsetRandomSampler(range(NUM_TRAIN)))

# Validation Data
cifar10_val = dset.CIFAR10('./data', train=True, download=True, transform=transform_normal)
loader_val = DataLoader(cifar10_val, batch_size=64, sampler=sampler.SubsetRandomSampler(range(NUM_TRAIN, 50000)))

# Test Data
cifar10_test = dset.CIFAR10('./data', train=False, download=True, transform=transform_normal)
loader_test = DataLoader(cifar10_test, batch_size=64)

Files already downloaded and verified
Files already downloaded and verified
Files already downloaded and verified


In [26]:
# Model
learning_rate = 1e-2
resnet = ResNet18()
#resnet = torchvision.models.resnet18(pretrained=False)
optimizer_resnet = optim.SGD(resnet.parameters(), lr=learning_rate, momentum=0.9, nesterov=True)
scheduler = lr_scheduler.StepLR(optimizer_resnet, step_size=15,gamma=0.1)


# Train Model
best_resnet = train_model(resnet, optimizer_resnet,20, scheduler)

# Test Accuracy
check_accuracy(loader_test, best_resnet)



Epoch 0, loss=1.0462, time=26.4973
---------Accuracy on validation set:, Got 633 / 1000 correct (63.30)
Epoch 1, loss=0.7599, time=26.5409
---------Accuracy on validation set:, Got 719 / 1000 correct (71.90)
Epoch 2, loss=0.9111, time=26.5255
---------Accuracy on validation set:, Got 779 / 1000 correct (77.90)
Epoch 3, loss=0.5393, time=26.5003
---------Accuracy on validation set:, Got 801 / 1000 correct (80.10)
Epoch 4, loss=0.6241, time=26.4965
---------Accuracy on validation set:, Got 825 / 1000 correct (82.50)
Epoch 5, loss=0.4295, time=26.5045
---------Accuracy on validation set:, Got 853 / 1000 correct (85.30)
Epoch 6, loss=0.3357, time=26.4991
---------Accuracy on validation set:, Got 845 / 1000 correct (84.50)
Epoch 7, loss=0.2391, time=26.5529
---------Accuracy on validation set:, Got 826 / 1000 correct (82.60)
Epoch 8, loss=0.2538, time=26.4884
---------Accuracy on validation set:, Got 865 / 1000 correct (86.50)
Epoch 9, loss=0.5752, time=26.5048
---------Accuracy on validati

0.9133

In [28]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/drive


In [29]:
PATH_FILE = '/content/drive/My Drive/Models/ResNet18_CIFAR10_ac90'
torch.save(best_resnet.state_dict(), PATH_FILE)