In [None]:
import numpy as np
import os
import torch
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, models
import torchvision
from torch.optim import lr_scheduler
import time
import copy
from PIL import Image

In [None]:
class WorksofArt():
    def __init__(self, img_dir='data/resized', transform=None):
        self.img_dir = img_dir
        self.transform = transform
        
        self.img_names = os.listdir(img_dir)  # list all images in the folder
        
        self.img_names.sort()
        
        self.labels = []
        for idx in range(len(self.img_names)):
            self.labels.append(self.img_names[idx].rpartition('_')[0])
        
        self.img_names = [os.path.join(img_dir, img_name) for img_name in self.img_names]
  
        painter_set = set(self.labels) # set of unique painters names
        self.num_painters = len(painter_set) # number of painters
        
        self.num_to_painter = {i: list(painter_set)[i] for i in range(self.num_painters)} # create a dictionary to map a number to a painter name
        #self.num_to_painter2 = dict(enumerate(painter_set))
        self.painter_to_num = dict(zip(self.num_to_painter.values(), self.num_to_painter.keys())) # create the reverse dictionary to map a painter name to a number
        
        for i in range(len(self.labels)):
            self.labels[i] = self.painter_to_num[self.labels[i]] # Change all label painter names by the respective number
            
    def __len__(self):
        return len(self.img_names)
    
    def __getitem__(self, idx):
        img_name = self.img_names[idx]
        img = Image.open(img_name)#.convert('RGB')
        if(img.mode != 'RGB'):
            img=img.convert('RGB')
            
        label = int(self.labels[idx])
        
        if self.transform:
            img = self.transform(img)
        return img, label
               

In [None]:
# Load dataset
transform = transforms.Compose([
    transforms.Resize((200,200)),
    transforms.ToTensor()])

dataset = WorksofArt(img_dir='data/resized/', transform=transform)


# split dataset
train_size = int(0.8 * len(dataset))
test_size = int(0.1 * len(dataset))
val_size = len(dataset) - train_size - test_size

train_data, val_data, test_data = torch.utils.data.random_split(dataset, [train_size, val_size, test_size]) 

In [None]:
# checking data 
print(type(train_data))
print(train_data[0][0].shape)

In [None]:
classes = dataset.num_to_painter
print(classes)
print(len(classes))

In [None]:
print(classes[0])

In [None]:
transf = transforms.ToPILImage() # create the transform that can be called to convert the tensor into a PIL Image

num_samples = 4

fig, axes = plt.subplots(1, num_samples, figsize=(20,10))

for i in range(num_samples):
    rand_idx = np.random.randint(0, train_size)
    rand_img = train_data[rand_idx][0]
    rand_img = transf(rand_img)    # call the transform on the tensor

    axes[i].imshow(rand_img)
    axes[i].set_title("Painter: " + classes[train_data[rand_idx][1]].replace('_', ' '), fontsize=20)
  
plt.show()

In [None]:
# creating dataloaders
batch_size = 64

# create training data loader
train_loader = torch.utils.data.DataLoader(dataset=train_data, batch_size=batch_size, shuffle=True)

# create validation data loader
val_loader = torch.utils.data.DataLoader(dataset=val_data, batch_size=batch_size, shuffle=True)

# create test data loader
test_loader = torch.utils.data.DataLoader(dataset=test_data, batch_size=batch_size, shuffle=True)

In [None]:
torch.cuda.empty_cache()
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
print("Using gpu: ", use_cuda)

In [None]:
def train(model, criterion, optimizer, scheduler, epochs=8):
    start_time = time.time()

    best_parameters = copy.deepcopy(model.state_dict())
    best_val_acc = 0.0
    best_epoch = 0
    all_cost = []

    for epoch in range(epochs):
        
        print('Epoch: {}'.format(epoch))
        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                dataloader = train_loader
                dataset_sizes = train_size
            else:
                model.eval()   # Set model to evaluate mode
                dataloader = val_loader
                dataset_sizes = val_size

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad() # reset gradients attribute to zero

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    all_cost.append(loss)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes
            epoch_acc = running_corrects.double() / dataset_sizes

            print('[{}] -> Loss: {:.4f} Acc: {:.4f}'.format(
                phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_val_acc:
                best_val_acc = epoch_acc
                best_parameters = copy.deepcopy(model.state_dict())
                best_epoch = epoch

        print('-' * 20)

    time_elapsed = time.time() - start_time
    print()
    print('Training completed in {:.0f}m {:.0f}s'.format(
        time_elapsed // 60, time_elapsed % 60))
    
    print('Best val Acc: {:4f} , occured in epoch: {}'.format(
        best_val_acc, best_epoch))

    # load best model weights
    model.load_state_dict(best_parameters)
    plt.figure()
    plt.ylabel('Cost')
    plt.xlabel('Epoch')
    plt.plot(all_cost)
    
    return model

We're going to apply a pre-trained model of a ResNet-18 (described in https://arxiv.org/pdf/1512.03385.pdf)
Taking advantage of the learned parameters and fine tunning the model. 

In [None]:
learning_rate = 0.0001

# load the model
ResNet18 = models.resnet18(pretrained=True)

# Change the full-connected layer to have the number of classes we're trying to predict
num_ftrs = ResNet18.fc.in_features
ResNet18.fc = torch.nn.Linear(num_ftrs, len(classes))

ResNet18 = ResNet18.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(ResNet18.parameters(), lr=learning_rate, weight_decay=1)

In [None]:
# schedule the learning rate
# step_size: at how many multiples of epoch you decay
# step_size = 1, after every 1 epoch, new_lr = lr*gamma 
# step_size = 2, after every 2 epoch, new_lr = lr*gamma 

# gamma = decaying factor
# Decay LR by a factor of 0.1 every 7 epochs
lr_scheduler = lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
ResNet18 = train(ResNet18, criterion, optimizer, lr_scheduler, epochs=28)