In [1]:
import torch
import torch.nn as nn
import numpy as np
from torch.utils.data import DataLoader, Dataset
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import Compose, ToTensor, Normalize, RandomRotation, ToPILImage
import torchvision.transforms.functional as TF
import random
from matplotlib import pyplot as plt
from random import randint

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
# Define a transform to normalize the data
transform = Compose([ToTensor(), Normalize(mean=(0.5,), std=(0.5,))])

In [4]:
root_dir = 'fashion_mnist'
batch_size=64

# Download and load the training data
trainset = FashionMNIST(root=root_dir, download=True, train=True, transform=transform)

# Download and load the test data
testset = FashionMNIST(root=root_dir, download=True, train=False, transform=transform)

In [5]:
class FashionMNISTAugmentedDataset(Dataset):

    def __init__(self, data, target, transform):
        self.data = data
        self.target = target
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        angle = random.choice([0, 90, 180, 270])
        
        datapoint = self.transform(self.data[index], angle)
        
        # need to use ints for classes [0, 90, 180, 270]
        if angle == 0:
            rotation_class = 0 
        if angle == 90:
            rotation_class = 1
        if angle == 180:
            rotation_class = 2
        if angle == 270:
            rotation_class = 3
        
        target = rotation_class

        return datapoint, target

In [6]:
# Rotate given images by given angle

def my_segmentation_transforms(image, angle):
    image = TF.to_pil_image(image)
    image = TF.resize(image, 32, interpolation=2)
    image = TF.rotate(image, angle)
    image = TF.to_tensor(image)
    image = TF.normalize(image, (0.5, ), (0.5, ))

    return image

In [7]:
batch_size=64

n_train = int(len(trainset) * 0.8)

train_augmented = FashionMNISTAugmentedDataset(
    # next line is for real training
    data=trainset.data[:n_train],
    # use next line only because of performance issues
#     data=trainset.data[:300], 
    # next line is for real training
    target=trainset.targets[:n_train],
    # use next line only because of performance issues
#     target=trainset.targets[:300], 
    transform=my_segmentation_transforms)
trainloader_augmented = DataLoader(train_augmented, batch_size=batch_size, shuffle=True, num_workers=32)

val_augmented = FashionMNISTAugmentedDataset(
    # next line is for real training
    data=trainset.data[n_train:], 
    # use next line only because of performance issues
#     data=trainset.data[300:400],
    # next line is for real training
    target=trainset.targets[n_train:],
    # use next line only because of performance issues
#     target=trainset.targets[300:400], 
    transform=my_segmentation_transforms)
valloader_augmented = DataLoader(val_augmented, batch_size=batch_size, shuffle=True, num_workers=32)

test_augmented = FashionMNISTAugmentedDataset(
    data=testset.data, 
    target=testset.targets, 
    transform=my_segmentation_transforms)
testloader_augmented = DataLoader(test_augmented, batch_size=batch_size, shuffle=True, num_workers=32)

In [9]:
import time
from torch.autograd import Variable

def train(model, loss_fn, optimizer, scheduler, num_epochs, trainloader, valloader):
    best_model_wts = model.state_dict()
    best_acc = 0.0
    dataloader = None
    dataset_size = 0
    
    for epoch in range(num_epochs):
        since = time.time()

        print('Epoch {}/{}'.format(epoch+1, num_epochs))

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                scheduler.step()
                model.train(True)  # Set model to training mode
                dataloader = trainloader
                dataset_size = len(trainloader.dataset)
            else:
                model.train(False)  # Set model to evaluate mode
                dataloader = valloader
                dataset_size = len(valloader.dataset)
             
            running_loss = 0.0
            running_corrects = 0.0

            # Iterate over data.
            
            for data in dataloader:
                # get the inputs
                inputs, labels = data
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                outputs = model(inputs)
#                 print("outputs:", outputs)
                _, preds = torch.max(outputs.data, 1)
                
#                 print("labels:", labels)
                loss = loss_fn(outputs, labels)

                # backward + optimize only if in training phase
                if phase == 'train':
                    loss.backward()
                    optimizer.step()

                # statistics
                running_loss += loss.item()
                running_corrects += torch.sum(preds == labels.data).to(torch.float32)

            epoch_loss = running_loss / dataset_size
            epoch_acc = running_corrects / dataset_size

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model

In [10]:
import torch
import torch.nn as nn
from torch.autograd import Variable

def flatten(x): 
    return x.view(x.size(0), -1)


def conv3x3(in_planes, out_planes, stride=1):
    "3x3 convolution with padding"
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):

    expansion = 1       

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()

        self.conv1 = conv3x3(in_planes, planes, stride)
        self.conv2 = conv3x3(planes, planes)

        self.bn1 = nn.BatchNorm2d(planes)
        self.bn2 = nn.BatchNorm2d(planes)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample

        self.stride = stride

    def forward(self, x):

        residue = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))

        if self.downsample is not None:
            residue = self.downsample(x)

        out += residue
        out = self.relu(out)
        return out


class Bottleneck(nn.Module):
    
    expansion = 4

    def __init__(self, in_planes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        
        self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.downsample = downsample
        
        self.stride = stride
        
    def forward(self, x):

        residue = x
        out = self.relu(self.bn1(self.conv1(x)))
        out = self.relu(self.bn2(self.conv2(x)))
        out = self.bn3(self.conv3(out))

        if self.downsample is not None:
            residue = self.downsample(x)

        out += residue
        out = self.relu(out)
        return out
            
            
class ResNet(nn.Module):

    def __init__(self, depth, name, num_classes=10, block=BasicBlock):
        super(ResNet, self).__init__()

        assert (depth - 2) % 6 == 0, 'Depth should be 6n + 2'
        n = (depth - 2) // 6

        self.name = name
        block = BasicBlock
        self.inplanes = 16
        fmaps = [16, 32, 64] # CIFAR10

        self.conv = nn.Conv2d(3, 16, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn = nn.BatchNorm2d(16)
        self.relu = nn.ReLU(inplace=True)

        self.layer1 = self._make_layer(block, fmaps[0], n, stride=1)
        self.layer2 = self._make_layer(block, fmaps[1], n, stride=2)
        self.layer3 = self._make_layer(block, fmaps[2], n, stride=2)

        self.avgpool = nn.AvgPool2d(kernel_size=8, stride=1)
        self.flatten = flatten
        self.fc = nn.Linear(fmaps[2] * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        ''' Between layers convolve input to match dimensions -> stride = 2 '''

        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                    nn.Conv2d(self.inplanes, planes * block.expansion,
                              kernel_size=1, stride=stride, bias=False),
                    nn.BatchNorm2d(planes * block.expansion))

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)


    def forward(self, x, print_sizes=False):
        
        if print_sizes:
            print('Sizes of the tensors inside each node: \n')
            print("\t In Model: input size", x.size())
        
        x = self.relu(self.bn(self.conv(x)))    # 32x32
        
        x = self.layer1(x)                      # 32x32
        x = self.layer2(x)                      # 16x16
        x = self.layer3(x)                      # 8x8

        x = self.avgpool(x)                     # 1x1
        x = self.flatten(x)                     # Flatten
        x  = self.fc(x)                         # Dense
        
        if print_sizes:
            print("\t In Model: output size", x.size())
            
        return x

In [11]:
def ResNet20(**kwargs):    
    return ResNet(name = 'ResNet20', depth = 20, num_classes=4,**kwargs)

In [12]:
resnet20 = ResNet20()
# fitting the convolution to 1 input channel (instead of 3)
resnet20.conv = nn.Conv2d(1, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)

In [13]:
# Criteria NLLLoss which is recommended with Softmax final layer
loss_fn = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optim = torch.optim.Adam(resnet20.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 4 epochs
sched = torch.optim.lr_scheduler.StepLR(optimizer=optim, step_size=4, gamma=0.1)

# Number of epochs
eps=5

resnet20_trained = train(resnet20, loss_fn, optim, sched, eps, trainloader_augmented, valloader_augmented)

Epoch 1/5
train Loss: 0.0238 Acc: 0.2700
val Loss: 0.0302 Acc: 0.2200
Epoch 2/5
train Loss: 0.0200 Acc: 0.4333
val Loss: 0.0265 Acc: 0.3200
Epoch 3/5
train Loss: 0.0176 Acc: 0.5667
val Loss: 0.0278 Acc: 0.3300
Epoch 4/5
train Loss: 0.0150 Acc: 0.6900
val Loss: 0.0232 Acc: 0.4700
Epoch 5/5
train Loss: 0.0129 Acc: 0.7233
val Loss: 0.0200 Acc: 0.6100
Training complete in 1m 19s
Best val Acc: 0.610000


In [14]:
# Transform given images for fashion classification (without rotation)

def my_classification_transforms(image):
    image = TF.to_pil_image(image)
    image = TF.resize(image, 32, interpolation=2)
    image = TF.to_tensor(image)
    image = TF.normalize(image, (0.5, ), (0.5, ))

    return image

In [15]:
# create Dataset for fashion classification (with fashion labels)

class FashionMNISTClassificationDataset(Dataset):

    def __init__(self, data, target, transform):
        self.data = data
        self.target = target
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        
        datapoint = self.transform(self.data[index])
        targetpoint = self.target[index]

        return datapoint, targetpoint

In [16]:
# dataloader for classification -> take only a subset

batch_size=1

# generate random indices for a 10% subset
number_trainInst = int(len(trainset))
subset_indices = np.random.choice(number_trainInst, number_trainInst/10, replace=False)
subset_data = torch.zeros([number_trainInst/10, 28, 28], dtype=torch.float)
subset_targets = np.zeros(number_trainInst/10, dtype=long)
# create subset for 10% of the original training set
for i, index in enumerate(subset_indices):
    subset_data[i] = trainset.data[index]
    subset_targets[i] = trainset.targets[index]

n_train = int(len(subset_data) * 0.8)

train_augmented_classification = FashionMNISTClassificationDataset(
    data=subset_data[:n_train], 
    # use next line only because of performance issues
#     data=subset_data[:50], 
    target=subset_targets[:n_train], 
    # use next line only because of performance issues
#     target=subset_targets[:50], 
    transform=my_classification_transforms)

trainloader_classification = DataLoader(train_augmented_classification, batch_size=batch_size, shuffle=True, num_workers=32)

val_augmented_classification = FashionMNISTClassificationDataset(
    data=subset_data[n_train:], 
    # use next line only because of performance issues
#     data=subset_data[100:150],
    target=subset_targets[n_train:], 
    # use next line only because of performance issues
#     target=subset_targets[100:150], 
    transform=my_classification_transforms)

valloader_classification = DataLoader(val_augmented_classification, batch_size=batch_size, shuffle=True, num_workers=32)

test_augmented_classification = FashionMNISTClassificationDataset(
    data=testset.data, 
    target=testset.targets, 
    transform=my_classification_transforms)

testloader_classification = DataLoader(test_augmented_classification, batch_size=batch_size, shuffle=True, num_workers=32)

In [17]:
# Criteria NLLLoss which is recommended with Softmax final layer
loss_fn = nn.CrossEntropyLoss()

# freeze all layers of the trained model
for param in resnet20_trained.parameters():
    param.requires_grad = False

# # unfreeze layer3
# for param in resnet20_trained.layer3.parameters():
#     param.requires_grad = True

# unfreeze final fc layer
for param in resnet20_trained.fc.parameters():
    param.requires_grad = True

# replace fc layer with 10 outputs
resnet20_trained.fc = nn.Linear(64, 10)

# Observe that all parameters are being optimized
optim = torch.optim.Adam(resnet20_trained.parameters(), lr=0.001)

# Decay LR by a factor of 0.1 every 4 epochs
sched = torch.optim.lr_scheduler.StepLR(optimizer=optim, step_size=4, gamma=0.1)

# Number of epochs
eps=5

resnet20_trainedClassification = train(resnet20_trained, loss_fn, optim, sched, eps, trainloader_classification, valloader_classification)

Epoch 1/5
train Loss: 2.4916 Acc: 0.0800
val Loss: 2.6453 Acc: 0.1200
Epoch 2/5
train Loss: 2.2884 Acc: 0.1800
val Loss: 2.6369 Acc: 0.1200
Epoch 3/5
train Loss: 2.2528 Acc: 0.1600
val Loss: 2.5940 Acc: 0.1800
Epoch 4/5
train Loss: 2.2585 Acc: 0.1400
val Loss: 2.6459 Acc: 0.1400
Epoch 5/5
train Loss: 2.1968 Acc: 0.1800
val Loss: 2.6149 Acc: 0.1400
Training complete in 0m 11s
Best val Acc: 0.180000


In [18]:
import matplotlib.pyplot as plt

# summarize the results graphically

def stats_plot(trainLoss, valLoss, valAccuracy):
    # x axis is number of epochs
    x = np.arange(0, len(valLoss), 1)

    fig=plt.figure(figsize=(12, 4), dpi= 80, facecolor='w', edgecolor='k')
    plt.figure(1)

    plt.subplot(221)
    plt.ylabel('training loss', fontsize=14, color='black')
    plt.grid(True)
    plt.plot(x, trainLoss, 'b', linewidth=2)


    plt.subplot(223)
    plt.ylabel('validation loss', fontsize=14, color='black')
    plt.plot(x, valLoss, 'r', linewidth=2)
    plt.grid(True)
    plt.xlabel('epoches', fontsize=14, color='black')


    plt.subplot(122)
    plt.ylabel('validation accuracy', fontsize=14, color='black')
    plt.plot(x, valAccuracy, 'g', linewidth=3)
    #plt.hlines(y=46.5, color='orange', label = '46.4%% threshold')
    plt.grid(True)
    plt.xlabel('epoches', fontsize=14, color='black')

    plt.tight_layout()

    plt.show()