# Import Libraries

In [1]:
import torch
from torchvision import transforms, datasets

from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


# Import outside code

In [2]:
import numpy as np
from sklearn.datasets import load_iris, load_wine, load_breast_cancer, make_circles, make_classification, make_regression


def train_val_test_split(data, labels, split=(0.6, 0.2, 0.2)):
    # Split data #
    num_data = data.shape[0]
    num_train_data = int(num_data * split[0])
    num_val_data = int(num_data * split[1])
    train_data = data[:num_train_data]
    train_labels = labels[:num_train_data]
    val_data = data[num_train_data:num_train_data + num_val_data]
    val_labels = labels[num_train_data:num_train_data + num_val_data]
    test_data = data[num_train_data + num_val_data:]
    test_labels = labels[num_train_data + num_val_data:]
    train_val_test = (train_data, train_labels, val_data, val_labels, test_data, test_labels)
    return train_val_test


def load_skl_data(data_name, need_num=None, split=(0.6, 0.2, 0.2)):
    # Load and unpack data from sklearn & randomise #
    if data_name == 'iris':
        skl_data = load_iris()
    elif data_name == 'wine':
        skl_data = load_wine()
    elif data_name == 'breast_cancer':
        skl_data = load_breast_cancer()
    num_data = skl_data['data'].shape[0]
    random_idx = np.random.permutation(num_data)
    data = skl_data['data'][random_idx]
    labels = skl_data['target'][random_idx]

    # Require number of data #
    if need_num is not None:
        data = data[:need_num]
        labels = data[:need_num]

    # Split data #
    train_val_test = train_val_test_split(data, labels, split=split)
    return train_val_test


def load_circular_data(need_num, noise=0.1, factor=0.5, split=(0.6, 0.2, 0.2)):
    # Load circular data #
    data, labels = make_circles(n_samples=need_num, noise=noise, factor=factor)
    labels[labels == 0] = -1

    # Split data #
    train_val_test = train_val_test_split(data, labels, split=split)
    return train_val_test


def load_two_spirals(need_num, noise=0.5, split=(0.6, 0.2, 0.2)):
    # Create two spirals data #
    n = np.sqrt(np.random.rand(need_num, 1)) * 780 * (2 * np.pi) / 360
    d1x = -np.cos(n) * n + np.random.rand(need_num, 1) * noise
    d1y = np.sin(n) * n + np.random.rand(need_num, 1) * noise
    data_extended = np.vstack((np.hstack((d1x, d1y)), np.hstack((-d1x, -d1y))))
    labels_extended = np.hstack((np.ones(need_num) * -1, np.ones(need_num)))
    idx = np.random.permutation(need_num * 2)
    data_extended = data_extended[idx]
    labels_extended = labels_extended[idx]
    data = data_extended[:need_num]
    labels = labels_extended[:need_num]

    # Split data #
    train_val_test = train_val_test_split(data, labels, split=split)
    return train_val_test


def load_random_classification_dataset(need_num, need_features, need_classes=2, need_flip=0.01, class_sep=1.0, random_state=None, split=(0.6, 0.2, 0.2)):
    # Create data for classification #
    n_informative = need_classes
    n_redundant = 0
    n_repeated = 0
    n_cluster_per_class = 2
    data, labels = make_classification(n_samples=need_num, n_features=need_features, n_informative=n_informative, n_redundant=n_redundant, n_repeated=n_repeated, n_classes=need_classes, n_clusters_per_class=n_cluster_per_class, flip_y=need_flip, class_sep=class_sep, random_state=random_state)

    # Change labels to +1/-1 if it is binary classification #
    if need_classes == 2:
        labels[labels == 0] = -1

    # Split data #
    train_val_test = train_val_test_split(data, labels, split=split)
    return train_val_test


def load_random_regression_dataset(need_num, need_features, bias, noise=1, random_state=None, split=(0.6, 0.2, 0.2)):
    # Create data for regression #
    n_informative = need_features
    n_targets = 1
    data, labels = make_regression(n_samples=need_num, n_features=need_features, n_informative=n_informative, n_targets=n_targets, bias=bias, noise=noise, random_state=random_state)

    # Split data #
    train_val_test = train_val_test_split(data, labels, split=split)
    return train_val_test

In [3]:
'''ResNet in PyTorch.

BasicBlock and Bottleneck module is from the original ResNet paper:
[1] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Deep Residual Learning for Image Recognition. arXiv:1512.03385

PreActBlock and PreActBottleneck module is from the later paper:
[2] Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun
    Identity Mappings in Deep Residual Networks. arXiv:1603.05027
'''
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.autograd import Variable


def conv3x3(in_planes, out_planes, stride=1):
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(in_planes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class PreActBlock(nn.Module):
    '''Pre-activation version of the BasicBlock.'''
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(PreActBlock, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = conv3x3(in_planes, planes, stride)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = conv3x3(planes, planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False)
            )

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out)
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out += shortcut
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class PreActBottleneck(nn.Module):
    '''Pre-activation version of the original Bottleneck module.'''
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(PreActBottleneck, self).__init__()
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False)
            )

    def forward(self, x):
        out = F.relu(self.bn1(x))
        shortcut = self.shortcut(out)
        out = self.conv1(out)
        out = self.conv2(F.relu(self.bn2(out)))
        out = self.conv3(F.relu(self.bn3(out)))
        out += shortcut
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_planes = 64

        self.conv1 = conv3x3(3,64)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.linear = nn.Linear(512*block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x, lin=0, lout=5):
        out = x
        if lin < 1 and lout > -1:
            out = self.conv1(out)
            out = self.bn1(out)
            out = F.relu(out)
        if lin < 2 and lout > 0:
            out = self.layer1(out)
        if lin < 3 and lout > 1:
            out = self.layer2(out)
        if lin < 4 and lout > 2:
            out = self.layer3(out)
        if lin < 5 and lout > 3:
            out = self.layer4(out)
        if lout > 4:
            out = F.avg_pool2d(out, 4)
            out = out.view(out.size(0), -1)
            out = self.linear(out)
        return out


def ResNet18():
    return ResNet(PreActBlock, [2,2,2,2])

def ResNet34():
    return ResNet(BasicBlock, [3,4,6,3])

def ResNet50():
    return ResNet(Bottleneck, [3,4,6,3])

def ResNet101():
    return ResNet(Bottleneck, [3,4,23,3])

def ResNet152():
    return ResNet(Bottleneck, [3,8,36,3])


def test():
    net = ResNet18()
    y = net(Variable(torch.randn(1,3,32,32)))
    print(y.size())

# test()


# Configuration

In [4]:
"""
Configuration and Hyperparameters
"""
#torch.set_default_tensor_type(torch.cuda.FloatTensor)  # default all in GPU, in pytorch 1.9 even need dataloader to be in GPU

transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),  # can omit
    transforms.RandomHorizontalFlip(),  # can omit
    transforms.ToTensor(),
    transforms.Normalize(
        (0.4914, 0.4822, 0.4465),
        (0.2023, 0.1994, 0.2010)
    )
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(
        (0.4914, 0.4822, 0.4465),
        (0.2023, 0.1994, 0.2010)
    )
])

batch_size = 128
step_size = 0.1
random_seed = 0
epochs = 100
L2_decay = 1e-4
alpha = 1.

torch.manual_seed(random_seed)

<torch._C.Generator at 0x7fa5df8438f0>

# Data

In [5]:
"""
Data
"""
train_set = datasets.CIFAR10(root='/content/gdrive/My Drive/colab', train=True, download=True, transform=transform_train)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
test_set = datasets.CIFAR10(root='/content/gdrive/My Drive/colab', train=False, download=True, transform=transform_test)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)

Files already downloaded and verified
Files already downloaded and verified


# Models, Loss, Optimiser

In [6]:
model = ResNet18()
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=step_size, momentum=0.9, weight_decay=L2_decay)
step_size_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[int(epochs / 2)], gamma=0.1)
model.cuda()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (layer1): Sequential(
    (0): PreActBlock(
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (shortcut): Sequential()
    )
    (1): PreActBlock(
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1

# Training

In [7]:
"""
Training
"""
model.train()
for epoch in range(epochs):
    epoch_loss = 0.
    for i, data in enumerate(train_loader, 0):
        optimizer.zero_grad()
        inputs, labels = data
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        
        # Original, calculate loss #
        outputs = model(inputs)
        loss = criterion(outputs, labels)

        # Record #
        epoch_loss += loss.item()

        # Gradient Calculation & Optimisation #
        loss.backward()
        optimizer.step()
    
    # Step size scheduler #
    step_size_scheduler.step()
    
    # Print loss #
    print('{}: {}'.format(epoch, epoch_loss))

0: 632.5686386823654
1: 424.7837136387825
2: 319.62375792860985
3: 257.1788677871227
4: 218.24028033018112
5: 192.6461730003357
6: 171.2714823782444
7: 156.63160154223442
8: 141.63316869735718
9: 131.3939371407032
10: 123.12237623333931
11: 112.02484633028507
12: 107.09455287456512
13: 100.31326265633106
14: 97.63636303693056
15: 91.64715941995382
16: 86.98350904136896
17: 84.23519591987133
18: 79.32910756766796
19: 76.26769892126322
20: 75.80859439820051
21: 72.2772590778768
22: 69.60286886617541
23: 64.99016304686666
24: 67.93894432112575
25: 63.66960769146681
26: 60.581576239317656
27: 60.590733874589205
28: 59.20289444923401
29: 56.58025251701474
30: 56.21359406784177
31: 54.57218823581934
32: 53.94698477908969
33: 54.330065708607435
34: 52.182127092033625
35: 50.72013778984547
36: 50.621046259999275
37: 49.22883127629757
38: 49.13921601139009
39: 47.81098922342062
40: 46.84727675281465
41: 47.422118216753006
42: 44.344913225620985
43: 44.58485494740307
44: 45.973121877759695
45: 4

# Save model

In [8]:
# torch.save(model.state_dict(), './model_pytorch_cifar10')
# model = ResNet18()
# model.load_state_dict(torch.load('./model_pytorch_cifar10'))

# Test on Test Data

In [9]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data
        inputs = inputs.to('cuda')
        labels = labels.to('cuda')
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
print(correct / total)

0.9381


# Test on Train Data

In [10]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for data in train_loader:
        inputs, labels = data
        inputs = inputs.to('cuda')
        labels = labels.type(torch.FloatTensor).reshape(-1, 1).to('cuda')
        outputs = model(inputs)
        predicts = (torch.sign(outputs) + 1) / 2
        total += labels.size(0)
        correct += (predicts == labels).sum().item()
print(correct / total)

0.98516
