In [None]:
from __future__ import print_function
import argparse
import os
import math
import numpy as np
import numpy.random as npr
import scipy.misc
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import pickle
import argparse


In [None]:
DATA_PATH = '/content/gdrive/MyDrive/Colab Notebooks/hw3/art_data'
DATA_FILE = DATA_PATH + 'art_data.pickle'
IMAGE_SIZE = 50
NUM_CHANNELS = 3
NUM_LABELS = 11
INCLUDE_TEST_SET = False

In [None]:
pwd

In [None]:
#@title
# Hyperparameters
batch_size = 10
learning_rate = 0.01
num_training_steps = 1501
# Enable dropout and weight decay normalization
dropout_prob = 0.0 # set to > 0.0 to apply dropout, 0.0 to remove
weight_penalty = 0.0 # set to > 0.0 to apply weight penalty, 0.0 to remove

class CNN(nn.Module):
    def _init_(self):
        super(CNN, self)._init_()

        # Network Architecture Parameters
        ############################################################################
        layer1_filter_size = 5
        layer1_depth = 16
        layer1_stride = 2
        layer2_filter_size = 5
        layer2_depth = 16
        layer2_stride = 2
        layer3_num_hidden = 64

        # Add max pooling
        self.pooling = False
        layer1_pool_filter_size = 2
        layer1_pool_stride = 2
        layer2_pool_filter_size = 2
        layer2_pool_stride = 2


        ############################################################################

        # Define Layers
        #########################################################################################################
        self.conv1 = nn.Conv2d(NUM_CHANNELS, layer1_depth, kernel_size=layer1_filter_size, stride = layer1_stride, padding=layer1_filter_size//2)
        self.relu1 = nn.ReLU()
        self.pooling1 = nn.MaxPool2d(kernel_size = layer1_pool_filter_size, stride = layer1_pool_stride)

        self.conv2 = nn.Conv2d(layer1_depth, layer2_depth, kernel_size=layer2_filter_size, stride = layer2_stride, padding=layer2_filter_size//2)
        self.relu2 = nn.ReLU()
        self.pooling2 = nn.MaxPool2d(kernel_size = layer2_pool_filter_size, stride = layer2_pool_stride)


        feature_size = int(math.ceil(IMAGE_SIZE/(layer1_stride*layer2_stride))) if not self.pooling else \
                            int(math.ceil(IMAGE_SIZE/(layer1_stride*layer2_stride*layer2_pool_stride*layer2_pool_stride)))
        self.feature_in = feature_size**2*layer2_depth
        self.fc = nn.Linear(self.feature_in,layer3_num_hidden )
        self.fc_relu = nn.ReLU()
        self.fc_dropout = nn.Dropout(p=dropout_prob)

        self.out = nn.Linear(layer3_num_hidden,NUM_LABELS)
        #########################################################################################################

    # define data propagation through the layers
    def forward(self, img):
        # reshape input tensor from Batch x Height x Width x Channles(3)
        # to Batch x Channles(3)x Height x Width
        img =img.permute(0,3,1,2)

        self.out1 = self.conv1(img)
        self.out1 = self.relu1(self.out1)
        if self.pooling:
            self.out1 = self.pooling1(self.out1)

        self.out2 = self.conv2(self.out1)
        self.out2 = self.relu2(self.out2)
        if self.pooling:
            self.out2 = self.pooling2(self.out2)

        # reshape the convolution feature map to a vector of size C*H*W
        B,C,H,W = self.out2.shape
        self.out2 = self.out2.reshape(B,C*H*W)


        self.out3 = self.fc(self.out2)
        self.out3 = self.fc_relu(self.out3)
        self.out3 = self.fc_dropout(self.out3)

        self.out_final = self.out(self.out3)
        self.out_final = F.softmax(self.out_final, dim=1)

        return self.out_final

def get_torch_vars(xs, ys, gpu=False):
    xs = torch.from_numpy(xs).float()
    ys = torch.from_numpy(ys).long()
    if gpu:
        xs = xs.cuda()
        ys = ys.cuda()
    return Variable(xs), Variable(ys)

def load_data():
    print ("Loading datasets...")

    with open(DATA_FILE, 'rb') as f:
        save = pickle.load(f,encoding='bytes')
        train_X = save[b'train_data']
        train_Y = save[b'train_labels']
        val_X = save[b'val_data']
        val_Y = save[b'val_labels']
        print ('Training set', train_X.shape, train_Y.shape)
        print ('Validation set', val_X.shape, val_Y.shape)

        if INCLUDE_TEST_SET:
            test_X = save[b'test_data']
            test_Y = save[b'test_labels']
            print ('Test set', test_X.shape, test_Y.shape)
            del save
            return (train_X,train_Y),(val_X,val_Y),(test_X,test_Y)


        del save  # hint to help gc free up memory

    return (train_X,train_Y),(val_X,val_Y)





def load_invariance_datasets():
    with open(DATA_PATH + 'invariance_art_data.pickle', 'rb') as f:
        save = pickle.load(f,encoding='bytes')
        translated_val_X = save[b'translated_val_data']
        flipped_val_X = save[b'flipped_val_data']
        inverted_val_X = save[b'inverted_val_data']
        bright_val_X = save[b'bright_val_data']
        dark_val_X = save[b'dark_val_data']
        high_contrast_val_X = save[b'high_contrast_val_data']
        low_contrast_val_X = save[b'low_contrast_val_data']
        del save
    return [translated_val_X,flipped_val_X,inverted_val_X,
                bright_val_X,dark_val_X,high_contrast_val_X,low_contrast_val_X]

def run_validation_step(cnn, criterion, val_x, val_y, batch_size, gpu=False):
    correct = 0.0
    total = 0.0
    losses = []
    for batch in range(0,len(val_x),batch_size):
        images = val_x[batch:batch+batch_size, :,:,:]
        labels = val_y[batch:batch+batch_size,:]
        images, labels = get_torch_vars(images, labels, gpu)
        labels = torch.argmax(labels, dim=1)
        preds = model(images)

        val_loss = criterion(preds,labels)
        losses.append(val_loss.item())

        _, predicted = torch.max(preds, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum()

    assert total == len(val_y), f'total {total} should be == to {len(val_y)}'
    val_loss = np.mean(losses)
    val_acc = 100 * correct / total
    return val_loss, val_acc

def train(model, train_x, train_y,val_x, val_y, criterion, gpu=False ):

    print("Beginning training ...")
    if gpu: model.cuda()
    start = time.time()

    train_losses = []
    valid_losses = []
    valid_accs = []
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, weight_decay = weight_penalty)
    for step in range(num_training_steps):
        # Train the Model
        model.train() # Change model to 'train' mode
        losses = []
        # get a batch of data
        offset = (step * batch_size) % (train_y.shape[0] - batch_size)
        images = train_x[offset:(offset + batch_size), :, :, :]
        labels = train_y[offset:(offset + batch_size), :]
        # get tensors to feed to the model
        images, labels = get_torch_vars(images, labels, gpu)
        labels = torch.argmax(labels, dim=1)
        # Forward + Backward + Optimize
        optimizer.zero_grad()
        preds = model(images)

        loss  = criterion(preds,labels)

        loss.backward()
        optimizer.step()
        losses.append(loss.item())

        if (step+1)%100 == 0:
            # Change model to 'eval' mode
            model.eval()

            avg_loss = np.mean(losses)
            train_losses.append(avg_loss)

            # Evaluate the model
            val_loss, val_acc = run_validation_step(model,criterion,val_x, val_y,batch_size, gpu)

            time_elapsed = time.time() - start
            valid_losses.append(val_loss)
            valid_accs.append(val_acc)
            print(f' Step {step+1}/{num_training_steps}, Loss: {avg_loss:.4f} Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.1f}, Time(s):{time_elapsed:.2f}')

    return model


if __name__ == '_main_':

    parser = argparse.ArgumentParser(description="Train Artists")

    parser.add_argument('--gpu', action='store_true', default=False,
                        help="Use GPU for training")
    parser.add_argument('--invariance', action="store_true", default=False,
                        help="Validate on invariance data")

    
    args = parser.parse_args()
   
    criterion = nn.CrossEntropyLoss()
    model = CNN()



    (train_X,train_Y),(val_X,val_Y) = load_data()
    model = train(model, train_X,train_Y,val_X,val_Y,criterion, gpu=True)
    invariance=False
    if not invariance:
        model.eval()
        val_loss, val_acc = run_validation_step(model,criterion,val_X,val_Y,batch_size, True)
        print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.1f}')
    else:
        invariance_sets = load_invariance_datasets()
        sets = [val_X]+invariance_sets
        set_names = ['normal validation', 'translated', 'brightened', 'darkened',
                     'high contrast', 'low contrast', 'flipped', 'inverted']
        for i in range(len(set_names)):
            model.eval()
            _, val_acc = run_validation_step(model,criterion,sets[i],val_Y,batch_size, True)
            print(f'Accuracy on {set_names[i]} Acc: {val_acc:.1f}')