# Import and variable definition

In [1]:
import os
import csv
import math
import numpy as np
import pandas as pd
from PIL import Image
import torch
from torch import nn, optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms, models
from skimage import io, transform
from skimage.color import rgba2rgb
from collections import OrderedDict
from kymatio import Scattering2D
import kymatio.datasets as scattering_datasets
from sklearn.decomposition import PCA
from sklearn.svm import SVC

TRAIN = 'train'
VAL = 'val'
PRED = 'pred'

use_cuda = True
device = torch.device('cuda' if use_cuda else 'cpu')
print ('Using GPU:', use_cuda)

Using GPU: True


# Load images and labels, preprocessing, and convert to DataFrame

In [2]:
# get painting labels
labels = {}

with open('Project1-Raphael/label.csv') as csv_file:
    csv_reader = csv.reader(csv_file, delimiter='.')
    for row in csv_reader:
        labels[row[0]] = row[1].strip()


# load images of painting and construct into dataframe
image_extensions = ['.TIF', '.tif', '.tiff', '.jpg']
data_set = []

for file in os.listdir('Project1-Raphael'):
    extension = os.path.splitext(file)[1]
    
    if extension in image_extensions:
        
        filename = os.path.splitext(file)[0]
        image = io.imread(os.path.join('Project1-Raphael', file))
        if image.shape[2] == 4:
            image = rgba2rgb(image)
        image = np.array(image)
        if image.max() <= 1:
            image = (image * 255).astype(np.uint8)
        if image.max() > 255:
            image = (image / 255).astype(np.uint8)
        else:
            image_id = filename.split(' ')[0].replace('.', '')
            label = labels[image_id]

            if label == 'Raphael':
                data_set.append({'Image': image, 'Disputed': 0, 'Raphael': 1, 'ID':image_id})
            elif label == 'Not Raphael':
                data_set.append({'Image': image, 'Disputed': 0, 'Raphael': 0, 'ID':image_id})
            else:
                data_set.append({'Image': image, 'Disputed': 1, 'Raphael': -1, 'ID':image_id})

df = pd.DataFrame(data_set)
training_df = df[df['Disputed'] == 0]
validation_df = df[df['Disputed'] == 0]
prediction_df = df[df['Disputed'] == 1]
dataframes = {'train': training_df, 'val': training_df, 'pred':prediction_df}

# Define dataloader

In [3]:
class RaphaelPaintingsDataset(Dataset):

    def __init__(self, df, transform=None):
        """
        Args:
            df (DataFrame): A dataframe containing painting image and painted by Raphael label.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        
        self.image = df['Image'].values
        self.raphael = df['Raphael'].values
        self.data_id = df['ID'].values
        self.transform = transform

    def __len__(self):
        return len(self.raphael)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        image = Image.fromarray(self.image[idx])
        if self.transform:
            image = self.transform(image)
        label = torch.tensor(self.raphael[idx], dtype=torch.int64)
        data_id = self.data_id[idx]
        
        sample = {'image': image, 'label': label, 'id': data_id}
        return sample
    

data_transforms = {
    TRAIN: transforms.Compose([
        transforms.Resize(512),
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ]),
    VAL: transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ]),
    PRED: transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))
    ])
}

datasets = {
    x: RaphaelPaintingsDataset(dataframes[x], data_transforms[x])
    for x in [TRAIN, VAL, PRED]
}

dataloaders = {
    x: torch.utils.data.DataLoader(datasets[x], batch_size=12, shuffle=True, num_workers=2)
    for x in [TRAIN, VAL, PRED]
}

# Define train and validate function

In [4]:
def train(model, device, train_loader, optimizer, scattering=None):
    model.train()
    train_loss = 0
    train_accuracy = 0
    
    for samples in train_loader:
        images, labels = samples['image'], samples['label']
        images, labels = images.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        if scattering:
            output = model(scattering(images))
        else:
            output = model.forward(images)
        
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        ps = torch.exp(output)
        equality = (labels.data == ps.max(dim=1)[1])
        train_accuracy += equality.type(torch.FloatTensor).mean()
        
    return train_loss, train_accuracy
        
def validate(model, device, validate_loader, scattering=None):
    model.eval()
    validate_loss = 0
    validate_accuracy = 0
    
    with torch.no_grad():
        for samples in validate_loader:
            images, labels = samples['image'], samples['label']
            images, labels = images.to(device), labels.to(device)
            
            if scattering:
                output = model(scattering(images))
            else:
                output = model.forward(images)
            
            validate_loss += criterion(output, labels).item()
            
            ps = torch.exp(output)
            equality = (labels.data == ps.max(dim=1)[1])
            validate_accuracy += equality.type(torch.FloatTensor).mean()
            
    return validate_loss, validate_accuracy

# Invariant scattering networks

In [5]:
scattering = Scattering2D(J=2, shape=(224, 224))
K = 81*3
if use_cuda:
    scattering = scattering.cuda()


class View(nn.Module):
    def __init__(self, *args):
        super(View, self).__init__()
        self.shape = args

    def forward(self, x):
        return x.view(-1,*self.shape)


ScatterMLP = nn.Sequential(View(K, 56, 56),
                           nn.BatchNorm2d(K),
                           View(K*56*56),
                           nn.Linear(K*56*56, 512),
                           nn.ReLU(),
                           nn.Linear(512, 256),
                           nn.ReLU(),
                           nn.Linear(256, 2),
                           nn.LogSoftmax(dim=1))
ScatterMLP.to(device)

for m in ScatterMLP.modules():
    if isinstance(m, nn.Linear):
        m.weight.data.normal_(0, 2./math.sqrt(m.in_features))
        m.bias.data.zero_()


# model training
epochs = 100
criterion = nn.NLLLoss()
optimizer = optim.Adam(ScatterMLP.parameters(), lr=0.001)

for e in range(epochs):
    
    train_loss, train_accuracy = train(ScatterMLP, device, dataloaders[TRAIN], optimizer, scattering)
    validate_loss, validate_accuracy = validate(ScatterMLP, device, dataloaders[VAL], scattering) 
    
    if (e+1) % 10 == 0:
        print("Epoch: {}/{}.. ".format(e+1, epochs),
              "Train (Loss: {:.4f}".format(train_loss/len(dataloaders[TRAIN])),
              "Accuracy: {:.3f}".format(train_accuracy/len(dataloaders[TRAIN])),
              ")  Validate (Loss: {:.3f}".format(validate_loss/len(dataloaders[VAL])),
              "Accuracy: {:.2f}".format(validate_accuracy/len(dataloaders[VAL])),
              ")")

Epoch: 10/100..  Train (Loss: 262.5260 Accuracy: 0.812 )  Validate (Loss: 14.533 Accuracy: 0.71 )
Epoch: 20/100..  Train (Loss: 6.0882 Accuracy: 0.875 )  Validate (Loss: 11.019 Accuracy: 0.85 )
Epoch: 30/100..  Train (Loss: 43.9822 Accuracy: 0.750 )  Validate (Loss: 19.900 Accuracy: 0.90 )
Epoch: 40/100..  Train (Loss: 65.9122 Accuracy: 0.833 )  Validate (Loss: 2.420 Accuracy: 0.90 )
Epoch: 50/100..  Train (Loss: 49.8260 Accuracy: 0.792 )  Validate (Loss: 46.965 Accuracy: 0.92 )
Epoch: 60/100..  Train (Loss: 220.2227 Accuracy: 0.771 )  Validate (Loss: 9.674 Accuracy: 0.90 )
Epoch: 70/100..  Train (Loss: 26.9802 Accuracy: 0.812 )  Validate (Loss: 59.389 Accuracy: 0.75 )
Epoch: 80/100..  Train (Loss: 25.5899 Accuracy: 0.875 )  Validate (Loss: 4.710 Accuracy: 0.96 )
Epoch: 90/100..  Train (Loss: 41.3156 Accuracy: 0.917 )  Validate (Loss: 32.981 Accuracy: 0.83 )
Epoch: 100/100..  Train (Loss: 99.7101 Accuracy: 0.750 )  Validate (Loss: 29.084 Accuracy: 0.85 )


# Transfer learning using pretrained vgg19

In [6]:
# model definition
vgg19 = models.vgg19(pretrained=True)

for param in vgg19.parameters():
    param.requires_grad = False

classifier = nn.Sequential(OrderedDict([('fc1', nn.Linear(25088, 1024)),
                                        ('relu1', nn.ReLU()),
                                        ('fc2', nn.Linear(1024,2)),
                                        ('output', nn.LogSoftmax(dim=1))
                                       ]))

vgg19.classifier = classifier
vgg19.to(device)


# model training
epochs = 20
criterion = nn.NLLLoss()
optimizer = optim.Adam(vgg19.classifier.parameters(), lr=0.001)

for e in range(epochs):
    
    train_loss, train_accuracy = train(vgg19, device, dataloaders[TRAIN], optimizer)
    validate_loss, validate_accuracy = validate(vgg19, device, dataloaders[VAL]) 
            
    print("Epoch: {}/{}.. ".format(e+1, epochs),
          "Train (Loss: {:.4f}".format(train_loss/len(dataloaders[TRAIN])),
          "Accuracy: {:.3f}".format(train_accuracy/len(dataloaders[TRAIN])),
          ")  Validate (Loss: {:.3f}".format(validate_loss/len(dataloaders[VAL])),
          "Accuracy: {:.2f}".format(validate_accuracy/len(dataloaders[VAL])),
          ")")

Epoch: 1/20..  Train (Loss: 2.6310 Accuracy: 0.417 )  Validate (Loss: 3.175 Accuracy: 0.58 )
Epoch: 2/20..  Train (Loss: 3.3421 Accuracy: 0.604 )  Validate (Loss: 2.445 Accuracy: 0.60 )
Epoch: 3/20..  Train (Loss: 1.6424 Accuracy: 0.708 )  Validate (Loss: 0.145 Accuracy: 0.90 )
Epoch: 4/20..  Train (Loss: 0.3526 Accuracy: 0.833 )  Validate (Loss: 2.019 Accuracy: 0.62 )
Epoch: 5/20..  Train (Loss: 1.5856 Accuracy: 0.729 )  Validate (Loss: 0.324 Accuracy: 0.83 )
Epoch: 6/20..  Train (Loss: 0.2674 Accuracy: 0.854 )  Validate (Loss: 0.180 Accuracy: 0.94 )
Epoch: 7/20..  Train (Loss: 0.1174 Accuracy: 0.958 )  Validate (Loss: 0.334 Accuracy: 0.92 )
Epoch: 8/20..  Train (Loss: 0.4625 Accuracy: 0.792 )  Validate (Loss: 0.279 Accuracy: 0.96 )
Epoch: 9/20..  Train (Loss: 0.3809 Accuracy: 0.854 )  Validate (Loss: 0.065 Accuracy: 0.96 )
Epoch: 10/20..  Train (Loss: 0.1934 Accuracy: 0.938 )  Validate (Loss: 0.019 Accuracy: 1.00 )
Epoch: 11/20..  Train (Loss: 0.0529 Accuracy: 1.000 )  Validate (Loss

# Prediction for ScatterNet and vgg19

In [21]:
def predict(model, device, predict_loader, scattering=None):
    model.eval()
    predict = {}
    
    with torch.no_grad():
        for samples in predict_loader:
            images = samples['image']
            images = images.to(device)
            
            if scattering:
                output = model(scattering(images))
            else:
                output = model.forward(images)
            
            ps = torch.exp(output).detach().cpu().numpy().astype(np.uint8)
            data_id = samples['id']
            
            for i in range(len(data_id)):
                predict[data_id[i]] = ps[i][0]
    
    predict = dict(sorted(predict.items()))
    return predict


scatter_net_predict = predict(ScatterMLP, device, dataloaders[PRED], scattering)
print ("Prediction by scattering MLP model:\n", scatter_net_predict)

vgg19_predict = predict(vgg19, device, dataloaders[PRED])
print ("Prediction by vgg16 model:\n", vgg19_predict)

Prediction by scattering MLP model:
 {'1': 0, '10': 1, '20': 1, '23': 1, '25': 0, '26': 1, '7': 1}
Prediction by vgg16 model:
 {'1': 0, '10': 0, '20': 0, '23': 0, '25': 0, '26': 0, '7': 0}


# Analysis 1

Comparing the first two feature extraction approches, the deep learning model perform much better than the scattering net model. Using transfoer learning on pretrained vgg19, we see a fast and study decrease in both traing and validation accuracy, acheaving 100% in a few epoch. Note the validation accuracy is higher than traing accuracy because with random crop and random flip, the traing set is much harder than the validation and prediction set. This is data augmentation to improve model training. In this part, we can say the deep learning model is better and should give a more accurate prediction.

# Data preprocessing for Sklearn

In [None]:
for index, row in df.iterrows():
    image_set = 

# Classical unsupervised learning: PCA

In [None]:
pca = PCA(n_components=3)
pca_image_set = pca.fit_transform(image_set)

# Traditional supervised learning: SVM