In [1]:
import torch
from torch import  nn
from torch.nn import functional as F
from torch import optim
from torch import Tensor
from torch import nn


IMAGE_SIZE = 196
NUM_CLASSES = 10

class ShallowFCNet(nn.Module):
    def __init__(self, dropout = 0):
        super(ShallowFCNet, self).__init__()
        self.fc1 = nn.Linear(IMAGE_SIZE, 120)
        self.fc2 = nn.Linear(120, NUM_CLASSES)
        self.drop = nn.Dropout(dropout)
        self.name = f"ShallowFCNet, dropout = {dropout}"

    def forward(self, x):
        x = F.relu(self.fc1(x.view(-1, IMAGE_SIZE)))
        x = self.drop(x)
        x = self.fc2(x)
        return x

class DeepFCNet(nn.Module):
    def __init__(self, nb_layers=4, dropout = 0):
        super(DeepFCNet, self).__init__()
        self.layers = nn.ModuleList([])
        self.name = f"DeepFCNet({nb_layers})"
        self.drop = nn.Dropout(dropout)
        acc = IMAGE_SIZE
        if nb_layers % 2 !=0:
            nb_layers = nb_layers - 1
        for l in range(nb_layers):
            if l < nb_layers/2:
                self.layers.append(nn.Linear(acc, acc*2))
                acc = acc*2
            else:
                self.layers.append(nn.Linear(acc, int(acc/2)))
                acc = int(acc/2)
        self.layers.append(nn.Linear(IMAGE_SIZE, 10))

    def forward(self, x):
        acc = IMAGE_SIZE
        for l in range(len(self.layers)-1):
            x = F.relu(self.layers[l](x.view(-1, acc)))
            x = self.drop(x)
            if l < (len(self.layers)-1)/2:
                acc = acc*2
            else:
                acc = int(acc/2)
        x = self.layers[len(self.layers)-1](x)

        return x

class BasicCNN(nn.Module):
    def __init__(self, dropout = 0):
        super(BasicCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=4)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=4)
        self.conv3 = nn.Conv2d(16, 32, kernel_size=4)
        self.fc1 = nn.Linear(32*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.drop = nn.Dropout(dropout)
        self.name = f"BasicCNN(dropout = {dropout})"

    def forward(self, x):
        x = F.relu(self.conv1(x.view(-1, 1, 14, 14)))
        x = F.relu(self.conv2(x))
        x = F.relu(self.conv3(x))
        x = F.relu(self.fc1(x.view(-1, 32*5*5)))
        x = self.drop(x)
        x = F.relu(self.fc2(x))
        x = self.drop(x)
        x = self.fc3(x)
        return x

class BasicCNN_bn(nn.Module):
    def __init__(self, dropout = 0):
        super(BasicCNN_bn, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=4)
        self.conv1_bn = nn.BatchNorm2d(6)
        self.conv2 = nn.Conv2d(6, 32, kernel_size=4)
        self.conv2_bn = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=4)
        self.conv3_bn = nn.BatchNorm2d(64)
        self.fc1 = nn.Linear(64*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.drop = nn.Dropout(dropout)
        self.name = f"BasicCNN with batch normalization, dropout = {dropout}"

    def forward(self, x):
        x = F.relu(self.conv1_bn(self.conv1(x.view(-1, 1, 14, 14))))
        x = F.relu(self.conv2_bn(self.conv2(x)))
        x = F.relu(self.conv3_bn(self.conv3(x)))
        x = F.relu(self.fc1(x.view(-1, 64*5*5)))
        x = self.drop(x)
        x = F.relu(self.fc2(x))
        x = self.drop(x)
        x = self.fc3(x)
        return x

class LeNet4(nn.Module):
    def __init__(self, dropout = 0):
        super(LeNet4, self).__init__()
        self.conv1 = nn.Conv2d(1, 4, kernel_size=5, padding = 9)
        self.conv2 = nn.Conv2d(4, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 10)
        self.drop = nn.Dropout(dropout)
        self.name = f"LeNet4, dropout = {dropout}"

    def forward(self, x):
        x = self.conv1(x.view(-1, 1, 14, 14))
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.fc1(x.view(-1, 400)))
        x = self.drop(x)
        x = self.fc2(x)
        return x

class LeNet5(nn.Module):
    def __init__(self, dropout = 0):
        super(LeNet5, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, padding = 9)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.drop = nn.Dropout(dropout)
        self.name = f"LeNet45, dropout = {dropout}"

    def forward(self, x):
        x = self.conv1(x.view(-1, 1, 14, 14))
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2, stride=2)
        x = F.relu(self.fc1(x.view(-1, 400)))
        x = self.drop(x)
        x = F.relu(self.fc2(x))
        x = self.drop(x)
        x = self.fc3(x)

        return x

class ResBlock(nn.Module):
    def __init__(self, dropout = 0):
        super(ResBlock, self).__init__()
        self.conv2 = nn.Conv2d(6, 32, kernel_size=4)
        self.conv2_bn = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=4)
        self.conv3_bn = nn.BatchNorm2d(64)
        self.drop = nn.Dropout(dropout)
        self.conv2_bis = nn.Conv2d(6, 64, kernel_size=1)
        self.avg = nn.AvgPool2d(kernel_size = 2)
        self.max = nn.MaxPool2d(kernel_size = 2)
        self.conv2_bis_bn = nn.BatchNorm2d(64)
    def forward(self, x):
        y = self.conv2_bn(self.conv2(x))
        y = F.relu(y)
        y = self.conv3_bn(self.conv3(y))
        y += F.relu(self.conv2_bis_bn(self.avg(self.conv2_bis(x)))) + F.relu(self.conv2_bis_bn(self.max(self.conv2_bis(x))))
        y = F.relu(y)
        return y

class ResNet(nn.Module):
    def __init__(self, dropout = 0):
        super(ResNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=4)
        self.conv1_bn = nn.BatchNorm2d(6)
        self.resblock = ResBlock(dropout)
        self.fc1 = nn.Linear(64*5*5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)
        self.drop = nn.Dropout(dropout)
        self.name = f"Residual network inspired from BasicCNN_bn, dropout = {dropout}"
    def forward(self, x):
        x = F.relu(self.conv1_bn(self.conv1(x.view(-1, 1, 14, 14))))
        x = self.resblock(x)
        x = F.relu(self.fc1(x.view(-1, 64*5*5)))
        x = self.drop(x)
        x = F.relu(self.fc2(x))
        x = self.drop(x)
        x = self.fc3(x)
        return x


optimizer_methods = {
    'SGD': (lambda parameters, eta, momentum: optim.SGD(parameters(), eta, momentum = momentum)),
    'Adam': (lambda parameters, eta, momentum: optim.Adam(parameters(), eta))
}

def train_model(model, train, train_classes, test, test_classes,
                mini_batch_size, eta, criterion, nb_epochs, momentum, optimizer_name):

    train_accuracy = torch.zeros(nb_epochs)
    test_accuracy = torch.zeros(nb_epochs)
    train_loss = torch.zeros(nb_epochs)
    test_loss = torch.zeros(nb_epochs)
    N_train = train.size(0)
    N_test = test.size(0)

    optimizer = optimizer_methods[optimizer_name](model.parameters, eta, momentum)

    for epoch in range(nb_epochs):
        correct_train_digits = 0
        for batch in range(0, N_train, mini_batch_size):
            output = model(train.narrow(0, batch, mini_batch_size))
            _, predicted_classes = output.max(1)
            loss = criterion(output, train_classes.narrow(0, batch, mini_batch_size))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            correct_train_digits += (train_classes[batch:batch+mini_batch_size] == predicted_classes).sum().item()

        train_loss[epoch] = loss.item()
        with torch.no_grad():
            output = model(test)
            loss = criterion(output, test_classes)
            test_loss[epoch] = loss.item()
            _, predicted_classes = output.max(1)
            correct_test_digits = (test_classes == predicted_classes).sum().item()


        # compute accuracy
        train_accuracy[epoch] = correct_train_digits / N_train
        test_accuracy[epoch] = correct_test_digits / N_test

    return train_accuracy, test_accuracy, train_loss, test_loss


def compute_project_accuracy(model, input1, input2, target):
    output1 = model(input1)
    output2 = model(input2)
    _, predicted_classes1 = output1.max(1)
    _, predicted_classes2 = output2.max(1)

    nb_correct_project = (target == (predicted_classes1 <= predicted_classes2)).sum().item()


    return float(nb_correct_project / target.size(0))



def weights_init(m):
    if isinstance(m, nn.Conv2d):
        torch.nn.init.xavier_uniform_(m.weight.data)

def train_test(model, train, test, train_classes, test_classes,
            train_target, test_target, mini_batch_size, criterion,
             nb_epochs, eta = 1e-2, momentum = 0.9, optimizer_name = 'SGD', repeats = 25):
    all_results = []

    N =  int(len(train)/2)
    train_comparison = torch.zeros(repeats,1)
    test_comparison  = torch.zeros(repeats,1)

    train_loss = torch.zeros(repeats, nb_epochs)
    test_loss = torch.zeros(repeats, nb_epochs)

    train_acc = torch.zeros(repeats, nb_epochs)
    test_acc = torch.zeros(repeats, nb_epochs)


    for i in range(repeats):
        model.apply(weights_init)

        train_acc[i], test_acc[i], train_loss[i], test_loss[i] = train_model(model, train, train_classes,
            test, test_classes, mini_batch_size, eta, criterion, nb_epochs, momentum,
            optimizer_name)

        # plot_accuracy(train_comparison[i], test_comparison[i], nb_epochs)

        train_comparison[i] = compute_project_accuracy(model, train[: N], train[N: ], train_target)
        test_comparison[i] = compute_project_accuracy(model, test[: N], test[N: ], test_target)

    all_results.append({"Model": model.name, "Optimizer": optimizer_name , "Epochs": nb_epochs, "Eta": eta, "Train Accuracy Mean": train_comparison.mean().item(),"Test Accuracy Mean": test_comparison.mean().item(), "Train Accuracy Std":  train_acc.std().item(), "Test Accuracy Std": test_acc.std().item(), "Digit acc table":     train_acc.mean(axis= 0)
            , "test Digit Accuracy Table":     test_acc.mean(axis= 0)})

    return all_results


In [2]:
from torch import empty
import torch
import math

class Module(object):

    def forward (self, *input):
        raise NotImplementedError
    def backward (self, *gradwrtoutput):
        raise NotImplementedError
    def param (self):
        return []

'''
Applies a linear transformation to the incoming data: y = xA^T + b
ARGS:
    - weight: the learnable weights
    - bias: the learnable bias
SHAPE:
    - weight: (out_features, in_features)
    - bias: (out_features)

'''
class Linear(Module):

    def __init__(self, in_features, out_features, bias = True):
        init_bound = 1/math.sqrt(in_features)
        self.weight = torch.Tensor(out_features, in_features).uniform_(-init_bound, init_bound)
        if bias:
            self.bias = torch.Tensor(out_features).uniform_(-init_bound, init_bound)
        else:
            self.bias = torch.zeros(out_features)
        self.grad_weight = torch.zeros(self.weight.size())
        self.grad_bias = torch.zeros(self.bias.size())

    def forward (self, input):
        self.previous_layer = input
        return input.mm(self.weight.T) + self.bias

    def backward (self, gradwrtoutput):
        self.grad_weight.add_(gradwrtoutput.T.mm(self.previous_layer))
        self.grad_bias.add_(gradwrtoutput.sum(0))
        grad_input = gradwrtoutput.mm(self.weight)
        return grad_input

    def param (self):
        return [(self.weight, self.grad_weight), (self.bias, self.grad_bias)]

class ReLU(Module):

    def __init__(self):
        self.input = None

    def forward (self, input):
        self.input = input
        zeroes = torch.full(input.size(), 0.0, dtype = float )
        return torch.where(input > 0, input.float(), zeroes.float())
    def backward (self, gradwrtoutput):
        ones = torch.ones(gradwrtoutput.size())
        zeroes = torch.full(gradwrtoutput.size(), 0.0, dtype = float )
        derivative = torch.where(self.input > 0, ones.float(), zeroes.float())
        return gradwrtoutput * derivative
    def param (self):
        return [(None, None)]

class Tanh(Module):

    def __init__(self):
        self.input = None
    def forward (self, input):
        self.input = input
        return input.tanh()
    def backward (self, gradwrtoutput):
        derivative = 1 - (self.input.tanh()).pow(2)
        return gradwrtoutput * derivative
    def param (self):
        return [(None, None)]

class Sequential(Module):

    def __init__(self, *modules):
        self.modules = list(modules)

    def forward (self, input):
        x = input
        for module in self.modules:
            x = module.forward(x)
        return x

    def backward (self, gradwrtoutput):
        x = gradwrtoutput
        for module in reversed(self.modules):
            x = module.backward(x)
        return x

    def param (self):
        parameters = []
        for module in self.modules:
            parameters.append(module.param())
        return parameters

class SGD():

    def __init__(self, parameters, eta):
        self.parameters = parameters
        if eta < 0.0:
            raise ValueError("Invalid learning rate: {}".format(eta))
        else:
            self.eta = eta

    def step(self):
        for module in self.parameters:
            for param, grad_param in module:
                if (param is not None and grad_param is not None):
                    param.sub_(self.eta * grad_param)


    def zero_grad(self):
        for module in self.parameters:
            for param, grad_param in module:
                if (param is not None and grad_param is not None):
                    grad_param.zero_()

class LossMSE():
    def loss(self, prediction, target):
        return (prediction - target).pow(2).sum()

    def grad(self, prediction, target):
        return 2*(prediction - target)

def normalize_data(x):
    mean, std =  x.mean(), x.std()
    x.sub_(mean).div_(std)

def train_model(model, train, train_target, test, test_target,
                mini_batch_size, eta, nb_epochs, normalize = True):

    train_accuracy = torch.zeros(nb_epochs)
    test_accuracy = torch.zeros(nb_epochs)
    train_loss = torch.zeros(nb_epochs)
    test_loss = torch.zeros(nb_epochs)
    N_train = train.size(0)
    N_test = test.size(0)

    if normalize:
        normalize_data(train)
        normalize_data(test)


    optimizer = SGD(model.param(), eta)
    MSE = LossMSE()

    for epoch in range(nb_epochs):

        nb_correct_classes_tr = 0
        loss = 0

        for batch in range(0, N_train, mini_batch_size):
            optimizer.zero_grad()
            output = model.forward(train.narrow(0, batch, mini_batch_size))
            loss += MSE.loss(output, train_target.narrow(0, batch, mini_batch_size))
            loss_grad = MSE.grad(output, train_target.narrow(0, batch, mini_batch_size))
            _, predicted_classes = output.max(1)
            model.backward(loss_grad)
            optimizer.step()
            nb_correct_classes_tr += (train_target.narrow(0, batch, mini_batch_size).argmax(1) == predicted_classes).sum().item()

        train_loss[epoch] = loss
        # compute test loss and accuracy without computing the gradients
        output = model.forward(test)
        loss = MSE.loss(output, test_target)
        test_loss[epoch] = loss.item()
        _, predicted_classes = output.max(1)
        nb_correct_classes_te = (test_target.argmax(1) == predicted_classes).sum().item()


        # compute accuracy
        train_accuracy[epoch] = nb_correct_classes_tr / N_train
        test_accuracy[epoch] = nb_correct_classes_te / N_test

    return train_accuracy, test_accuracy, train_loss, test_loss


def generate_data(size):
    input = torch.Tensor(size, 2).uniform_(0, 1)
    target = input.sub(torch.tensor([0.5, 0.5])).pow(2).sum(1).sub(1 / (2*math.pi)).sign().add(1).div(2).long()
    return input, target

def one_hot_encoding(target):
    onehot = torch.zeros(target.size(0), 2).fill_(0)
    onehot[range(onehot.shape[0]), target]=1
    return onehot


In [3]:

train_input, train_target, train_classes, test_input, test_target, test_classes = prologue.generate_pair_sets(1000)

train = torch.cat(( train_input[:, 0, :, :], train_input[:, 1, :, :]), 0)
test = torch.cat((test_input[:, 0, :, :], test_input[:, 1, :, :]), 0)

train_classes = torch.cat((train_classes[:, 0], train_classes[:, 1]), 0)
test_classes = torch.cat((test_classes[:, 0], test_classes[:, 1]), 0)

models = [ShallowFCNet(), DeepFCNet(), BasicCNN(), BasicCNN_bn(), LeNet4(), LeNet5(), ResNet()]
optimizers = ['SGD']
dropouts = [0, 0.25]
criterions = [nn.CrossEntropyLoss(), nn.MultiMarginLoss()]
epochs = [1]

all_results = []
PATH =  os.path.dirname(os.path.abspath(__file__))

for model in models:
    for optimizer in optimizers:
        for criterion in criterions:
            for dropout in dropouts:
                for epoch in epochs:
                    all_results = train_test(model, train, test, train_classes,
                                test_classes, train_target, test_target, 100,
                                criterion, epoch, optimizer_name = optimizer)
                    path = "/models/" + model.name + "_" + optimizer + "_" + str(dropout) + "_" + str(epoch)
                    torch.save(model.state_dict(), PATH)

with open('comparison_models.json', 'w') as json_file:
    json.dump(all_results[0], json_file)
print(all_results)

print('Project 1 done')
print('')


train, train_target = generate_data(1000)
test, test_target = generate_data(1000)

train_one_hot_target = one_hot_encoding(train_target)
test_one_hot_target = one_hot_encoding(test_target)

# Requirements given by project2
input_units = 2
output_units = 2
nb_hidden_units = 25

model = Sequential(Linear(input_units, nb_hidden_units), Tanh(),
                         Linear(nb_hidden_units, nb_hidden_units), Tanh(),
                         Linear(nb_hidden_units, nb_hidden_units), Tanh(),
                         Linear(nb_hidden_units, output_units))

train_model(model, train, train_one_hot_target, test, test_one_hot_target, 25, 1e-4, 100)




NameError: name 'prologue' is not defined