In [None]:
!pip install nonechucks

In [None]:
%matplotlib inline
%config InlineBackend.figure_format = 'retina'

import matplotlib.pyplot as plt
import numpy as np
import nonechucks as nc

import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from torchvision import datasets, transforms, models

import os
os.chdir('/kaggle/input/chinese-zodiac-signs')

In [None]:
# remove corrupted images from the datasets
from PIL import ImageFile, Image
ImageFile.LOAD_TRUNCATED_IMAGES = True

for (dirpath, dirnames, filenames) in os.walk('.'):
    for file in filenames:
        try:
            im = Image.open(dirpath + '/' + file)
            if im.mode != 'RGB': im.convert('RGB').save(dirpath + '/' + file)
        except:
            continue

In [None]:
# check if CUDA is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

In [None]:
# load datasets
data_dir = 'signs'

batch_size = 20

train_transforms = transforms.Compose([transforms.RandomResizedCrop(224),
                                       transforms.ToTensor(),
                                       transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                            std=[0.229, 0.224, 0.225])])

valid_transforms = transforms.Compose([transforms.Resize(255),
                                       transforms.CenterCrop(224),
                                       transforms.ToTensor(),
                                       transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                                            std=[0.229, 0.224, 0.225])])

test_transforms = valid_transforms

train_data = datasets.ImageFolder(data_dir + '/train', transform=train_transforms)
valid_data = datasets.ImageFolder(data_dir + '/valid', transform=valid_transforms)
test_data = datasets.ImageFolder(data_dir + '/test', transform=test_transforms)

train_data = nc.SafeDataset(train_data)
valid_data = nc.SafeDataset(valid_data)
test_data = nc.SafeDataset(test_data)

trainloader = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True)
validloader = torch.utils.data.DataLoader(valid_data, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=True)

In [None]:
# check if data loading was succesfull
print('No. training images: ', len(train_data))
print('No. validation images: ', len(valid_data))
print('No. test images: ', len(test_data))

In [None]:
# function to reverse normalization of the images
def inv_normalize(img):
    img[0] = img[0] * 0.229 + 0.485
    img[1] = img[1] * 0.224 + 0.456
    img[2] = img[2] * 0.225 + 0.406
    return img

In [None]:
# visualize some training data (images & labels)
dataiter_train = iter(trainloader)
dataiter_valid = iter(validloader)
dataiter_test = iter(testloader)
images_train, labels_train = dataiter_train.next()
images_valid, labels_valid = dataiter_valid.next()
images_test, labels_test = dataiter_test.next()
images_train, images_valid, images_test = images_train.numpy(), images_valid.numpy(), images_test.numpy()

classes = ['dog', 'dragon', 'goat', 'horse', 'monkey', 'ox', 'pig', 'rabbit', 'rat', 'rooster', 'snake', 'tiger']

fig = plt.figure(figsize=(25, 4))
fig.suptitle('Training Data', fontsize=18)
for i in np.arange(20):
    ax = fig.add_subplot(2, 20/2, i+1, xticks=[], yticks=[])
    img = inv_normalize(images_train[i])
    plt.imshow(np.transpose(img, (1, 2, 0)))
    ax.set_title(classes[labels_train[i]])
    
fig = plt.figure(figsize=(25, 4))
fig.suptitle('Validation Data', fontsize=18)
for i in np.arange(20):
    ax = fig.add_subplot(2, 20/2, i+1, xticks=[], yticks=[])
    img = inv_normalize(images_valid[i]) 
    plt.imshow(np.transpose(img, (1, 2, 0)))
    ax.set_title(classes[labels_valid[i]])
    
fig = plt.figure(figsize=(25, 4))
fig.suptitle('Test Data', fontsize=18)
for i in np.arange(20):
    ax = fig.add_subplot(2, 20/2, i+1, xticks=[], yticks=[])
    img = inv_normalize(images_test[i])
    plt.imshow(np.transpose(img, (1, 2, 0)))
    ax.set_title(classes[labels_test[i]])

In [None]:
# load pretrained model 
model = models.resnext101_32x8d(pretrained=True)
model

In [None]:
# freeze parameters of the pretrained model
for parameter in model.parameters():
    parameter.requires_grad = False

In [None]:
# redefine last fully connected layer
model.fc = nn.Sequential(nn.Linear(2048, 1024),
                         nn.ReLU(),
                         nn.Dropout(0.5),
                         nn.Linear(1024, 512),
                         nn.ReLU(),
                         nn.Dropout(0.5),
                         nn.Linear(512, 12))

In [None]:
# move model to GPU
if train_on_gpu:
    model.cuda()

In [None]:
# define loss function
criterion = nn.CrossEntropyLoss()

# define optimizer
optimizer = optim.SGD(model.fc.parameters(), lr=0.003, momentum=0.9)

In [None]:
# Training the modified fully connected layer
n_epochs = 10

valid_loss_min = np.Inf 

for epoch in range(1, n_epochs+1):

    train_loss = 0.0
    valid_loss = 0.0
    
    # training
    model.train()
    # iterate over training data
    for data, target in trainloader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()

        optimizer.zero_grad()

        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        train_loss += loss.item()*data.size(0)
                
    # validation
    model.eval()
    # iterate over validation data
    for data, target in validloader:
        if train_on_gpu:
            data, target = data.cuda(), target.cuda()

        output = model(data)
        loss = criterion(output, target)

        valid_loss += loss.item()*data.size(0)

    train_loss = train_loss/len(trainloader.dataset)
    valid_loss = valid_loss/len(validloader.dataset)

    # monitor training and validation statistics 
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch, train_loss, valid_loss))

    # save model if validation loss has decreased
    if valid_loss <= valid_loss_min:
        print('Validation loss decreased ({:.6f} --> {:.6f}).  Saving model ...'.format(
        valid_loss_min,
        valid_loss))
        torch.save(model.state_dict(), '/kaggle/working/model.pt')
        valid_loss_min = valid_loss

In [None]:
# load best performing model
trained_model = model
trained_model.load_state_dict(torch.load('/kaggle/working/model.pt'), strict=False)
if train_on_gpu:
    trained_model.cuda()
trained_model.eval()

In [None]:
# Testing
test_loss = 0.0
class_correct = list(0. for i in range(len(classes)))
class_total = list(0. for i in range(len(classes)))

model.eval()
# iterate over test data
for data, target in testloader:
    if train_on_gpu:
        data, target = data.cuda(), target.cuda()
        
    output = trained_model(data)
    loss = criterion(output, target)
    
    test_loss += loss.item()*data.size(0)

    _, pred = torch.max(output, 1)    
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.numpy()) if not train_on_gpu else np.squeeze(correct_tensor.cpu().numpy())
    # calculate test accuracy for each object class
    for i in range(batch_size):
        label = target.data[i]
        class_correct[label] += correct[i].item()
        class_total[label] += 1

# calculate average test loss
test_loss = test_loss/len(testloader.dataset)
print('Test Loss: {:.6f}\n'.format(test_loss))

for i in range(12):
    if class_total[i] > 0:
        print('Test Accuracy of %5s: %0.2f%% (%2d/%2d)' % (
            classes[i], 100 * class_correct[i] / class_total[i],
            np.sum(class_correct[i]), np.sum(class_total[i])))
    else:
        print('Test Accuracy of %5s: N/A (no training examples)' % (classes[i]))

print('\nTest Accuracy (Overall): %0.2f%% (%2d/%2d)' % (
    100. * np.sum(class_correct) / np.sum(class_total),
    np.sum(class_correct), np.sum(class_total)))

In [None]:
# visualize testing
dataiter = iter(testloader)
images, labels = dataiter.next()
images.numpy()

if train_on_gpu:
    images = images.cuda()

output = trained_model(images)
_, preds_tensor = torch.max(output, 1)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu else np.squeeze(preds_tensor.cpu().numpy())

images = images.cpu()
# plot the results
fig = plt.figure(figsize=(25, 4))
for i in np.arange(20):
    ax = fig.add_subplot(2, 20/2, i+1, xticks=[], yticks=[])
    img = inv_normalize(images[i]) 
    img = img[:3,:,:]
    plt.imshow(np.transpose(img, (1, 2, 0)))
    ax.set_title("{} ({})".format(classes[preds[i]], classes[labels[i]]),
                 color=("green" if preds[i]==labels[i].item() else "red"))