In [0]:
trainSize = 100000
validSize = 2000
numClass = 200
TopPick = 5
batch_size = 64
numEpoch = 100
model_name = 'vgg' 
sample_img_name = 'example.jpg'

import numpy as np
import os
from PIL import Image
import time
import json
import copy
import matplotlib.pyplot as plt
import seaborn as sns
from collections import OrderedDict
import torch
from torch import nn, optim
from torch.optim import lr_scheduler
from torch.autograd import Variable
import torchvision
from torchvision import datasets, models, transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.functional as TF
import random

In [0]:
# Preprocessing images in the dataset
if not os.path.exists("Images"):
    os.mkdir("Images")
    if not os.path.exists("Images//Train"):
        os.mkdir("Images//Training")
        os.mkdir("Images//Validation")

baseDirT = "Images//Training//"
baseDirV = "Images//Validation//"

category = []
allNum = 0
for file in os.listdir("Dataset"):
    if file.endswith(".npy"):
        Name = file[:-4]
        if not os.path.exists(baseDirT + Name):
            os.mkdir(baseDirT + Name)
            os.mkdir(baseDirV + Name)
        tempFile = np.load("Dataset//" + file)
        num = 0
        NbaseDirT = baseDirT + Name + '//'
        NbaseDirV = baseDirV + Name + '//'
        while(num < trainSize):
            im = Image.fromarray((-(tempFile[num] - 255)).reshape(28,28))
            fileName = str(allNum) + ".jpg"
            im.save(NbaseDirT + fileName, 'JPEG')
            num += 1
            allNum += 1
        num = 0
        while(num < validSize):
            im = Image.fromarray((-(tempFile[num + trainSize] - 255)).reshape(28,28))
            fileName = str(allNum) + ".jpg"
            im.save(NbaseDirV + fileName, 'JPEG')
            num += 1
            allNum += 1

In [0]:
"""
from google.colab import drive
drive.mount('/content/drive')
"""

In [0]:
# training on GPU/CPU
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('Training on CPU')
else:
    print('Training on GPU')

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [0]:
data_dir = 'Images'
train_dir = data_dir + '//Training'
valid_dir = data_dir + '//Validation'

In [0]:
# Data Transformation
class MyRotationTransform:

    def __init__(self, angles):
        self.angles = angles

    def __call__(self, x):
        angle = random.choice(self.angles)
        return TF.rotate(x, angle)

rotation_transform = MyRotationTransform(angles=[-90, 0, 90, 180])

data_transforms = {
    'Training': transforms.Compose([
        transforms.Resize(224),
        rotation_transform,
        transforms.ToTensor(),
    ]),
    'Validation': transforms.Compose([
        transforms.Resize(224) ,
        transforms.ToTensor(),
    ])
}


image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), \
                                          data_transforms[x]) for x in ['Training', 'Validation']}

dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, \
                                              shuffle=True, num_workers=4) for x in ['Training', 'Validation']}

class_names = image_datasets['Training'].classes

dataset_sizes = {x: len(image_datasets[x]) for x in ['Training', 'Validation']}

In [0]:
print(dataset_sizes)
print(device)

In [0]:
images, labels = next(iter(dataloaders['Training']))
images.size()

In [0]:
# Choose the CNN architecture

if model_name == 'densenet':
    model = models.densenet161(pretrained=True)
    num_in_features = 2208
elif model_name == 'vgg':
    model = models.vgg19(pretrained=True)
    num_in_features = 25088
else:
    pass

for param in model.parameters():
    param.requires_grad = False

In [0]:
# Build model

def build_classifier(num_in_features, hidden_layers, num_out_features):
    classifier = nn.Sequential()
    if hidden_layers == None:
        classifier.add_module('fc0', nn.Linear(num_in_features, 102))
    else:
        layer_sizes = zip(hidden_layers[:-1], hidden_layers[1:])
        classifier.add_module('fc0', nn.Linear(num_in_features, hidden_layers[0]))
        classifier.add_module('relu0', nn.ReLU())
        classifier.add_module('drop0', nn.Dropout(.6))
        classifier.add_module('relu1', nn.ReLU())
        classifier.add_module('drop1', nn.Dropout(.5))
        for i, (h1, h2) in enumerate(layer_sizes):
            classifier.add_module('fc'+str(i+1), nn.Linear(h1, h2))
            classifier.add_module('relu'+str(i+1), nn.ReLU())
            classifier.add_module('drop'+str(i+1), nn.Dropout(.5))
        classifier.add_module('output', nn.Linear(hidden_layers[-1], num_out_features))
    return classifier

hidden_layers = None

classifier = build_classifier(num_in_features, hidden_layers, numClass)

In [0]:
if model_name == 'densenet':
    model.classifier = classifier
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adadelta(model.parameters()) 
    sched = optim.lr_scheduler.StepLR(optimizer, step_size=4)
elif model_name == 'vgg':
    model.classifier = classifier
    criterion = nn.NLLLoss()
    optimizer = optim.Adam(model.classifier.parameters(), lr=0.0001)
    sched = lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)
else:
    pass

In [0]:
# train the model
def train_model(model, criterion, optimizer, sched, num_epochs=5):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)
        for phase in ['Training', 'Validation']:
            if phase == 'Training':
                model.train()  
            else:
                model.eval()
            running_loss = 0.0
            running_corrects = 0
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'Training'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    if phase == 'Training':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]
            print('{} Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))
            if phase == 'Validation' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
        print()
    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    model.load_state_dict(best_model_wts)
    return model

In [0]:
epochs = numEpoch
model.to(device)
model = train_model(model, criterion, optimizer, sched, epochs)

In [0]:
# test classification accuracy
model.eval()

accuracy = 0

for inputs, labels in dataloaders['Validation']:
    inputs, labels = inputs.to(device), labels.to(device)
    outputs = model(inputs)
    equality = (labels.data == outputs.max(1)[1])
    accuracy += equality.type_as(torch.FloatTensor()).mean()
    
print("Test accuracy: {:.3f}".format(accuracy/len(dataloaders['Validation'])))

In [0]:
model.class_to_idx = image_datasets['Training'].class_to_idx

In [0]:
# Save the trained model

checkpoint = {'input_size': 2208,
              'output_size': numClass,
              'epochs': epochs,
              'batch_size': 64,
              'model': models.vgg19(pretrained=True),
              'classifier': classifier,
              'scheduler': sched,
              'optimizer': optimizer.state_dict(),
              'state_dict': model.state_dict(),
              'class_to_idx': model.class_to_idx
             }
   
torch.save(checkpoint, 'checkpoint.pth')

In [0]:
"""
ckpt = torch.load('checkpoint.pth')
"""