In [11]:
import numpy as np    
import matplotlib.pyplot as plt       
from torchvision.transforms import transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import os

# Google Colab
#from fastai.vision.all import *
#set_seed(42, reproducible= True)
#source = untar_data(URLs.IMAGENETTE)

In [12]:
classes = ("Tench", "English Springer", "Cassette Player", "Chain Saw", "Church", "French Horn", "Garbage Truck", "Gas Pump", "Golf Ball", "Parachute")

def load_data():
  img_dir = 'imagenette2/'

  train = os.path.join(img_dir, 'train')
  val = os.path.join(img_dir, 'val')


  train_dataset = ImageFolder(
    train,
    transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224), 
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.4655, 0.4546, 0.4251), (0.2775, 0.2725, 0.2938)),
        transforms.RandomErasing()
    ]))

  test_dataset = ImageFolder(
    val,
    transforms.Compose([
        transforms.Resize(256),
        transforms.RandomCrop(224), 
        transforms.ToTensor(),
        transforms.Normalize((0.4655, 0.4546, 0.4251), (0.2775, 0.2725, 0.2938))
    ]))

  train_dataloader = DataLoader(train_dataset, batch_size=20, shuffle=True)
  test_dataloader = DataLoader(test_dataset, batch_size=20)
  
  return train_dataloader, test_dataloader

Model

In [13]:
class ResBlock(nn.Module):
    def __init__(self, in_channels, out_channels, downsample):
        super().__init__()
        if downsample:
            self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=2, padding=1)
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2),
                nn.BatchNorm2d(out_channels)
            )
        else:
            self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=1, padding=1)
            self.shortcut = nn.Sequential()

        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.dropout = nn.Dropout2d(0.1)

    def forward(self, input):
        shortcut = self.shortcut(input)
        input = nn.ReLU()(self.bn1(self.conv1(input)))
        input = self.dropout(input)
        input = nn.ReLU()(self.bn2(self.conv2(input)))
        input = self.dropout(input)
        input = input + shortcut
        input = nn.ReLU()(input)
        input = self.dropout(input)
        return input


In [14]:

class ResNet34(nn.Module):
    def __init__(self, in_channels, resblock, outputs=10):
        super().__init__()
        self.layer0 = nn.Sequential(
            nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU()
        )

        self.layer1 = nn.Sequential(
            resblock(64, 64, downsample=False),
            resblock(64, 64, downsample=False),
            resblock(64, 64, downsample=False)
        )

        self.layer2 = nn.Sequential(
            resblock(64, 128, downsample=True),
            resblock(128, 128, downsample=False),
            resblock(128, 128, downsample=False),
            resblock(128, 128, downsample=False)
        )

        self.layer3 = nn.Sequential(
            resblock(128, 256, downsample=True),
            resblock(256, 256, downsample=False),
            resblock(256, 256, downsample=False),
            resblock(256, 256, downsample=False),
            resblock(256, 256, downsample=False),
            resblock(256, 256, downsample=False)
        )


        self.layer4 = nn.Sequential(
            resblock(256, 512, downsample=True),
            resblock(512, 512, downsample=False),
            resblock(512, 512, downsample=False),
        )

        self.gap = torch.nn.AdaptiveAvgPool2d(1)
        self.fc = torch.nn.Linear(512, outputs)
        self.dropout = nn.Dropout(0.5)

    def forward(self, input):
        input = self.layer0(input)
        input = self.layer1(input)
        input = self.layer2(input)
        input = self.layer3(input)
        input = self.layer4(input)
        input = self.gap(input)
        input = torch.flatten(input,start_dim=1)
        input = self.dropout(input)
        input = self.fc(input)

        return F.log_softmax(input, dim=1)

# Use saved model
path = "./84resnet34.pth"
model = ResNet34(3, ResBlock)
model.load_state_dict(torch.load(path))

<All keys matched successfully>

ResNet50

In [15]:

# class ResBottleneckBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, downsample):
#         super().__init__()
#         self.downsample = downsample
#         self.conv1 = nn.Conv2d(in_channels, out_channels//4, kernel_size=1, stride=1)
#         self.conv2 = nn.Conv2d(out_channels//4, out_channels//4, kernel_size=3, stride=2 if downsample else 1, padding=1)
#         self.conv3 = nn.Conv2d(out_channels//4, out_channels, kernel_size=1, stride=1)
#         self.shortcut = nn.Sequential()
        
#         if self.downsample or in_channels != out_channels:
#             self.shortcut = nn.Sequential(
#                 nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=2 if self.downsample else 1),
#                 nn.BatchNorm2d(out_channels)
#             )

#         self.bn1 = nn.BatchNorm2d(out_channels//4)
#         self.bn2 = nn.BatchNorm2d(out_channels//4)
#         self.bn3 = nn.BatchNorm2d(out_channels)
#         self.dropout = nn.Dropout2d(0.1)

#     def forward(self, input):
#         shortcut = self.shortcut(input)
#         input = nn.ReLU()(self.bn1(self.conv1(input)))
#         input = self.dropout(input)
#         input = nn.ReLU()(self.bn2(self.conv2(input)))
#         input = self.dropout(input)
#         input = nn.ReLU()(self.bn3(self.conv3(input)))
#         input = self.dropout(input)
#         input = input + shortcut
#         input = nn.ReLU()(input)
#         input = self.dropout(input)
#         return input

In [16]:
# class ResNet(nn.Module):
#     def __init__(self, in_channels, resblock, repeat, useBottleneck=False, outputs=10):
#         super().__init__()
#         self.layer0 = nn.Sequential(
#             nn.Conv2d(in_channels, 64, kernel_size=7, stride=2, padding=3),
#             nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
#             nn.BatchNorm2d(64),
#             nn.ReLU(),
#         )

#         if useBottleneck:
#             filters = [64, 256, 512, 1024, 2048]
#         else:
#             filters = [64, 64, 128, 256, 512]

#         self.layer1 = nn.Sequential()
#         self.layer1.add_module('conv2_1', resblock(filters[0], filters[1], downsample=False))
#         for i in range(1, repeat[0]):
#                 self.layer1.add_module('conv2_%d'%(i+1,), resblock(filters[1], filters[1], downsample=False))

#         self.layer2 = nn.Sequential()
#         self.layer2.add_module('conv3_1', resblock(filters[1], filters[2], downsample=True))
#         for i in range(1, repeat[1]):
#                 self.layer2.add_module('conv3_%d' % (i+1,), resblock(filters[2], filters[2], downsample=False))

#         self.layer3 = nn.Sequential()
#         self.layer3.add_module('conv4_1', resblock(filters[2], filters[3], downsample=True))
#         for i in range(1, repeat[2]):
#             self.layer3.add_module('conv2_%d' % (i+1,), resblock(filters[3], filters[3], downsample=False))

#         self.layer4 = nn.Sequential()
#         self.layer4.add_module('conv5_1', resblock(filters[3], filters[4], downsample=True))
#         for i in range(1, repeat[3]):
#             self.layer4.add_module('conv3_%d'%(i+1,), resblock(filters[4], filters[4], downsample=False))

#         self.gap = torch.nn.AdaptiveAvgPool2d(1)
#         self.dropout = nn.Dropout(0.5)
#         self.fc = torch.nn.Linear(filters[4], outputs)

#     def forward(self, input):
#         input = self.layer0(input)
#         input = self.layer1(input)
#         input = self.layer2(input)
#         input = self.layer3(input)
#         input = self.layer4(input)
#         input = self.gap(input)
#         input = torch.flatten(input, start_dim=1)
#         input = self.dropout(input)
#         input = self.fc(input)
#         output = F.log_softmax(input, dim=1)
#         return output
        
# path = "./resnet101(77).pth"
# #model = ResNet(3, ResBottleneckBlock, [3, 4, 6, 3], useBottleneck=True, outputs=10)

# model = ResNet(3, ResBottleneckBlock, [3, 4, 23, 3], useBottleneck=True, outputs=10)
# model.load_state_dict(torch.load(path))

Main Training Loop

In [17]:
from torch.autograd import Variable
from torch.optim import Adam

# Function to save the model
def saveModel():
    path = "./classifier_full.pth"
    torch.save(model.state_dict(), path)

# Function to test the model with the test dataset and print the accuracy for the test images
def testAccuracy(device):
    
    model.eval()
    accuracy = 0.0
    total = 0.0
    
    with torch.no_grad():
        for data in test_loader:
            images, labels = data
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()
    
    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)

def trainAccuracy(device):

    model.eval()
    accuracy = 0.0
    total = 0.0
    
    with torch.no_grad():
        for data in train_loader:
            images, labels = data
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))
            # run the model on the test set to predict labels
            outputs = model(images)
            # the label with the highest energy will be our prediction
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            accuracy += (predicted == labels).sum().item()
    
    # compute the accuracy over all test images
    accuracy = (100 * accuracy / total)
    return(accuracy)

def train(num_epochs):

    best_accuracy = 0.0

    # Define your execution device
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("The model will be running on", device, "device")
    # Convert model parameters and buffers to CPU or Cuda


    model.to(device)
    
    # Define the loss function with Classification Cross-Entropy loss and an optimizer with Adam optimizer
    loss_fn = nn.CrossEntropyLoss()
    optimizer = Adam(model.parameters(), lr=0.00001, weight_decay = 0.00001)
    #optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)
    all_accuracy = []

    for epoch in range(num_epochs):  # loop over the dataset multiple times
        running_loss = 0.0

        for i, (images, labels) in enumerate(train_loader, 0):
            # get the inputs
            images = Variable(images.to(device))
            labels = Variable(labels.to(device))

            # zero the parameter gradients
            optimizer.zero_grad()
            # predict classes using images from the training set
            outputs = model(images)
            # compute the loss based on model output and real labels
            loss = loss_fn(outputs, labels)
            # backpropagate the loss
            loss.backward()
            # adjust parameters based on the calculated gradients
            optimizer.step()

            # Let's print statistics for every 50 images
            running_loss += loss.item()     # extract the loss value
            if i % 1000 == 999:    
                # print every 50 (twice per epoch) 
                print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 1000))
                # zero the loss
                running_loss = 0.0

        # Compute and print the average accuracy for this epoch when tested over all test images
        accuracy = testAccuracy(device)
        train_accuracy = trainAccuracy(device)
        all_accuracy.append(accuracy)
        print('For epoch', epoch+1,'the train accuracy is %d %%' % (train_accuracy), 'the test accuracy over the whole test set is %d %%' % (accuracy))
        
        # we want to save the model if the accuracy is the best
        if accuracy > best_accuracy:
            saveModel()
            best_accuracy = accuracy
    print('The average accuracy over %d' % num_epochs, ' runs is %.2f' % (sum(all_accuracy)/num_epochs))

Showing Images

In [18]:
# Function to show the images
def imageshow(img):
    img = img / 2 + 0.5     # unnormalize
    if torch.cuda.is_available():
      npimg = img.cpu().numpy()
    else:
      npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

In [19]:
def checkTransformedImages():
    # get batch of images from the test DataLoader
    images, labels = next(iter(test_loader))
    if torch.cuda.is_available():
      images = images.cuda()
      labels = labels.cuda()

    # show all images as one image grid
    #imageshow(torchvision.utils.make_grid(images))

Run Block

In [20]:
train_loader, test_loader = load_data()

num_epochs = 50
train(num_epochs)

checkTransformedImages()

The model will be running on cuda:0 device
For epoch 1 the train accuracy is 79 % the test accuracy over the whole test set is 77 %
For epoch 2 the train accuracy is 87 % the test accuracy over the whole test set is 82 %
For epoch 3 the train accuracy is 88 % the test accuracy over the whole test set is 82 %
For epoch 4 the train accuracy is 88 % the test accuracy over the whole test set is 82 %
For epoch 5 the train accuracy is 89 % the test accuracy over the whole test set is 82 %
For epoch 6 the train accuracy is 89 % the test accuracy over the whole test set is 82 %
For epoch 7 the train accuracy is 89 % the test accuracy over the whole test set is 82 %
For epoch 8 the train accuracy is 89 % the test accuracy over the whole test set is 82 %
For epoch 9 the train accuracy is 88 % the test accuracy over the whole test set is 82 %
For epoch 10 the train accuracy is 90 % the test accuracy over the whole test set is 83 %
For epoch 11 the train accuracy is 90 % the test accuracy over the

KeyboardInterrupt: 