In [1]:
import time
import numpy as np
import torch
import torchvision
from torch.autograd import Variable
import torchvision.transforms as transforms
import matplotlib.pyplot as plt
from PIL import Image
import torch
import pickle
from keras.datasets import cifar100

# Constants
n_labels = 20
# this should not be a constant, fix!

#image_dimensions = 104


# centre the data
def centre_data(train, validation, test):
    
    # calculate the means for each attribute of the training data
    column_means = np.mean(train, axis=0) 
    
    # centre training data by subtracting training data attribute means
    for i in range(len(train)):
        train[i] = train[i] - column_means
    
    # centre testing data by subtracting training data attribute means
    for x in range(len(test)):
        test[x] = test[x] - column_means
    
    for x in range(len(validation)):
        validation[x] = validation[x] - column_means
        
    return train, validation, test

# apply PCA on the data 
def PCA(variance_target, training_data, validation_data, testing_data):

    U, sigma, Vt = np.linalg.svd(training_data, full_matrices=False)
    
    sum_square_singular = np.sum(sigma**2)
    
    ratios = sigma**2/sum_square_singular
    
                
    n_components = 0
    explained_variance = 0
    
    # determine how many principle components must be retained to maintain the target level of explained variance
    for i in range(len(ratios)):
        if explained_variance >= variance_target:
            break
        else: 
            n_components += 1
            explained_variance += ratios[i]
    
    return training_data.dot(Vt.T[:, :n_components]), testing_data.dot(Vt.T[:, :n_components]), validation_data.dot(Vt.T[:, :n_components])


class Perceptron(torch.nn.Module):
    
    def __init__(self, n_hidden_units, n_hidden_layers, image_dimensions, drop_rate=0.5):
        super(Perceptron, self).__init__()
        
        self.n_hidden_layers = n_hidden_layers
        self.image_dimensions = image_dimensions
    
        
        # set up perceptron layers and add dropout, outputs linear transformation y = Wx + b
        self.linear_function_1 = torch.nn.Linear(self.image_dimensions, n_hidden_units)
        
        # randomly zeroes some of the elements of the input tensor with probability p using samples from a 
        # Bernoulli distribution
        self.linear_function_1_drop = torch.nn.Dropout(drop_rate)
        
        # if number of hidden layers is 2
        if n_hidden_layers == 2:
            
            #TODO: don't know if I need these 
            self.linear_function_2 = torch.nn.Linear(n_hidden_units, n_hidden_units)
            
            self.linear_function_2_drop = torch.nn.Dropout(drop_rate)
            
        if n_hidden_layers == 3:
            
            self.linear_function_3 = torch.nn.Linear(n_hidden_units, n_hidden_units)
            
            self.linear_function_3_drop = torch.nn.Dropout(drop_rate)

        self.output = torch.nn.Linear(n_hidden_units, n_labels)

    # feed forward the data 
    def forward(self, input_data):
        

        input_data = input_data.view(-1, self.image_dimensions)
        
        # input x is passed to fully connected layer, then step function elu is applied, makes it non linear
        input_data = torch.nn.functional.elu(self.linear_function_1(input_data))
        
        input_data = self.linear_function_1_drop(input_data)
        
        if self.n_hidden_layers == 2:
         
            input_data = torch.nn.functional.elu(self.linear_function_2(input_data))
            
            input_data = self.linear_function_2_drop(input_data)
        
        if self.n_hidden_layers == 3:
         
            input_data = torch.nn.functional.elu(self.linear_function_3(input_data))
            
            input_data = self.linear_function_3_drop(input_data)
        
        # It is applied to all slices along dim, and will re-scale them so that 
        # the elements lie in the range [0, 1] and sum to 1
        return torch.nn.functional.log_softmax(self.output(input_data), -1)


def train_model(model, train_loader, optimizer, log_interval=100):
    
    # switch the module mode to .train() so that new weights can be learned after every epoch
    model.train()
    
    num_correct_predictions = 0
    
    # total number of data points 
    num_data_points = len(train_loader.dataset)
    
    for batch, (training_data, training_labels) in enumerate(train_loader):
     
        # sets the gradients to zero before we start back propogation 
        optimizer.zero_grad()
        
        # pass training data into model 
        prediction = model(training_data.float())

        # get the index of the max log-probability
        best_prediction = prediction.data.max(1)[1] 
        
        # determine number of correct predictions 
        num_correct_predictions += (best_prediction.eq(training_labels.data)).sum()
        
        accuracy = num_correct_predictions / num_data_points * 100.00
      
        # calculate negative log likelihood loss
        loss = torch.nn.functional.nll_loss(prediction, training_labels)
        
        # automatically performs the back propogation 
        loss.backward()
        
        # updates the weights accordingly
        optimizer.step()

def validate_model(loss_vector, accuracy_vector, model, validation_loader):
    
    # the common practice for evaluating/validation is using torch.no_grad() 
    # in pair with model.eval() to turn off gradients computation
    model.eval()
    
    loss = 0
    correct_predictions = 0
    
    with torch.no_grad():
        for x_train, y_train in validation_loader:
            
            #x_train, y_train = Variable(x_train), Variable(y_train)
            
            output = model(x_train.float())
            
            # The negative log likelihood loss - (input, target) - 
            loss += torch.nn.functional.nll_loss(output, y_train).data
            
            # get the index of the max log-probability
            best_prediction = output.data.max(1)[1] 
            
            # Compares two tensors element-wise for equality if they are broadcast-compatible; or returns False if they are not broadcast-compatible
            # .sum() Returns the sum of all elements in the input tensor.
            correct_predictions += best_prediction.eq(y_train.data).sum()

    loss /= len(validation_loader)
    
    accuracy = 100. * correct_predictions / len(validation_loader.dataset)

    print('\nValidation set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        loss, correct_predictions, len(validation_loader.dataset), accuracy))
    
def unpickle(file):
    with open(file, 'rb') as dataset:
        cifar_100_dataset = pickle.load(dataset, encoding='bytes')
    return cifar_100_dataset

class Train_Dataset(torch.utils.data.Dataset):

    def __init__(self, preprocessed_training_data, training_coarse_labels, transform):
        
        self.train_labels = training_coarse_labels
        
        self.train_data = preprocessed_training_data
        self.transform = transform
    
    def __len__(self):
        # total number of training samples
        return len(self.train_data)

    def __getitem__(self, index):
        img = self.train_data[index]
        target = self.train_labels[index]
#         img = Image.fromarray(img)

#         if self.transform is not None:
#             img = self.transform(img)
    
        return img, target
    

class Test_Dataset(torch.utils.data.Dataset):

    def __init__(self, preprocessed_testing_data, testing_coarse_labels, transform):
        self.test_labels = testing_coarse_labels
        self.test_data = preprocessed_testing_data
        self.transform = transform
    
    def __len__(self):
        # total number of testing samples 
        return len(self.test_data)

    def __getitem__(self, index):
        img = self.test_data[index]
    
        target = self.test_labels[index]
#         img = Image.fromarray(img)
        
#         if self.transform is not None:
#             img = self.transform(img)

        return img, target
    
def load_data_and_create_dataloaders():
    
    (training_data, training_labels), (testing_data, testing_labels) = (cifar100.load_data("coarse"))
    
#     testing_labels = np.squeeze(testing_labels)
#     training_labels = np.squeeze(training_labels)
    
    # reshape the data 
    training_data = training_data.reshape(50000, 3072)
    testing_data = testing_data.reshape(10000, 3072)

   ## preprocess data
    validation_data = training_data[49000:, :]
    validation_labels = np.squeeze(training_labels[49000:, :])
    training_data = training_data[:49000, :]
    training_labels = np.squeeze(training_labels[:49000, :])
    testing_labels = np.squeeze(testing_labels)
    testing_data = testing_data

#     Centre data
    training_data, validation_data, testing_data = centre_data(training_data, validation_data, testing_data)

#     Apply PCA
    #training_data, testing_data, validation_data = PCA(0.99, training_data, validation_data, testing_data)
    

    
    #reshape the data back to the original shape
    
    print(training_data.shape)
    image_dimensions = training_data.shape[1]
    
    print(testing_data.shape)
    
#     training_data.reshape(49000, 32, 32, 3) 
#     testing_data.reshape(10000, 32, 32, 3) 

#     training_data = training_data.reshape((49000, 3, 32, 32))
#     training_data = training_data.transpose((0, 2, 3, 1))
    
#     testing_data = testing_data.reshape((10000, 3, 32, 32))
#     testing_data = testing_data.transpose((0, 2, 3, 1))
    
    # transformations to be applied to data 
    transform = transforms.Compose([transforms.ToTensor()])
    
    
    # dataset for training model    
    training_set = Train_Dataset(training_data.astype(np.float), training_labels, transform=transform)
    
    

    # data loader for training set 
    train_loader = torch.utils.data.DataLoader(training_set, batch_size=4, shuffle=True, num_workers=0, pin_memory=True)

    # dataset for testing model
    testing_set = Test_Dataset(testing_data.astype(np.float), testing_labels, transform=transform)
    
    # data loader for testing set 
    validation_loader = torch.utils.data.DataLoader(testing_set, batch_size=4, shuffle=False, num_workers=0, pin_memory=True)
    
    return train_loader, validation_loader, image_dimensions
    
def main():
    
    train_loader, validation_loader, image_dimensions = load_data_and_create_dataloaders()
    
    num_hidden_units = 200
    num_hidden_layers = 1
    start_time = time.time()
    
    model = Perceptron(num_hidden_units, num_hidden_layers, image_dimensions)

    # stochastic gradient descent 
    # could try different optimisers here 
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=.75, weight_decay=.0005)

    losses = [] 
    accuracies = []
    
    for iteration in range(1, 20):
        
        train_model(model, train_loader, optimizer)
        
        validate_model(losses, accuracies, model, validation_loader)
        
    total_time = (time.time() - start_time)/60
    
    print("Total time", total_time)


main()

(49000, 3072)
(10000, 3072)

Validation set: Average loss: 78.4150, Accuracy: 500/10000 (5%)



KeyboardInterrupt: 