In [56]:
import torch
import math
import matplotlib.pyplot as plt

torch.set_grad_enabled(False)

<torch.autograd.grad_mode.set_grad_enabled at 0x7ffa9a609700>

In [57]:
class Module(object) :
    def __init__(self):
        super().__init__()
    
    def forward(self , *input):
        raise  NotImplementedError
        
    def backward(self , *gradwrtoutput):
        raise  NotImplementedError
        
    def param(self): # These are the layers of the network
        return  []

In [58]:
class Sequential(Module):
    def __init__(self, param):
        super().__init__()
        self.model = (param)
        self.loss = LossMSE()
    
    def forward(self, x):
        for layer in self.model:
            x = layer.forward(x)
        return x
    
    def backward(self, output, target):
        grad = self.loss.backward(target, output)
        for layer in reversed(self.model):
            grad = layer.backward(grad)
        
        Loss = self.loss.forward(target, output)
        return Loss

In [117]:
class Linear(Module):
    def __init__(self, in_, out_):
        super().__init__()
        self.in_ = in_
        self.out_ = out_
        
        # Capture the term at each layer before the passage in the layer
        # and the activation function.
        self.x = torch.zeros(out_)
        
        # Initialization of the weights and the bias
        param = 1. / math.sqrt(in_)
        self.weight = torch.empty(self.in_, self.out_).uniform_(-param, param)
        self.bias = torch.empty(self.out_).uniform_(-param, param)
        
    def forward(self, x):
        self.x = x
        #print('w = ', self.weight.size())
        #print('b = ', self.bias.size())
        #print('res =',(x.mm(self.weight) + self.bias).size())
        return x.mm(self.weight) + self.bias
    
    def set_Lr(self, lr):
        self.lr = lr
        return
        
    def backward(self, grad):
        lr = 0.005
        self.weight = self.weight - lr * self.x.t().mm(grad)
        self.bias = self.bias - lr * grad * 1
        grad = grad.mm(self.weight.t())
        return grad
    
    def weight(self):
        return self.weight
    
    def bias(self):
        return self.bias

In [118]:
class LossMSE(object):
    def __init__(self):
        super().__init__() 
    
    def forward(self, data_target, data_output):
        loss = (data_output - data_target).pow(2).sum()
        return loss
    
    def backward(self, data_target, data_output):
        dloss = 2 * (data_output - data_target)
        return dloss

In [119]:
class ReLU(Module):
    
    def __init__(self ):
        super().__init__()
        self.save = 0
        
    def forward(self, x):
        y = x.clamp(min = 0)
        self.save = x
        return y
    
    def backward(self, x):
        y = self.save > 0
        return y.float() * x
         
    def print(self):
        return

In [120]:
class Tanh(Module) :
    def __init__(self, ):
        super().__init__()
        self.save = 0
    
    def  forward(self, x):
        self.save = x
        return torch.div(x.exp() - (-x).exp(), x.exp() + (-x).exp())
        
    def  backward(self, x):
        return (1 - torch.div(self.save.exp() - 
                    (-self.save).exp(), self.save.exp() + (-self.save).exp())**2) * x
        
    def print(self):
        return

In [121]:
def create_random_batch(input_size, mini_batch_size):
    
    # This function return a 2D tensor that is the rando selection of inputs for our
    # stochastic gradient method, taking in count the number of mini_batches.
    
    # We suppose here that our mini_batch_size is well chosen taking in count the fact
    # that it divides input_size.
    
    # Initialization
    L = int(input_size / mini_batch_size)
    new_batch = torch.ones(L, mini_batch_size)
    
    indices = torch.randperm(input_size)
    for k in range(L):
        new_batch[k] = indices[k * mini_batch_size : (k+1) * mini_batch_size]
    
    return new_batch

In [122]:
def train_model(model, train_input, train_classes, nb_epochs, mini_batch_size):
    
    h_step = 1e-3
    
    for epoch in range(nb_epochs):
        random_batches = create_random_batch(train_input.size(0), mini_batch_size).tolist()
        for batch in range(0, train_input.size(0), mini_batch_size):
            output = model.forward(train_input.narrow(0, batch, mini_batch_size))
            loss = model.backward(output, train_classes.narrow(0, batch, mini_batch_size))

In [123]:
def compute_nb_errors(model, data_input, data_target, mini_batch_size):
    
    nb_data_errors = 0

    for b in range(0, data_input.size(0), mini_batch_size):
        result = model.forward(data_input.narrow(0, b, mini_batch_size))
        
        # Faster
        predicted_classes = result >= (0.5 * torch.ones(result.size(0),1))
        predicted_classes = predicted_classes.int()
        
        for k in range(mini_batch_size):
            if data_target[b + k] != predicted_classes[k]:
                nb_data_errors = nb_data_errors + 1
                
    return nb_data_errors

In [124]:
def create_problem(nb_samples):
    
    # Remark: the function .uniform return a uniform distribution on [0,1) instead of [0,1],
    # but in our case it's not a problem since it is only a train and a test set on a circle
    # that do not touch the border of the set [0,1]^2.
    train_input = torch.empty(nb_samples, 2).uniform_(0, 1)
    test_input = torch.empty(nb_samples, 2).uniform_(0, 1)
    
    # Radius of our circle
    R = 1 / math.sqrt(2 * math.pi)
    
    train_classes = train_input.sub(0.5).pow(2).sum(1).sub(R**2).sign().sub(1).div(-2).long().resize_((nb_samples,1))
    test_classes = test_input.sub(0.5).pow(2).sum(1).sub(R**2).sign().sub(1).div(-2).long().resize_((nb_samples,1))
    
    return train_input, train_classes, test_input, test_classes

In [125]:
def get_tests(n):
    M = []
    for k in range (n):
        L = []
        _, _, test_input, test_classes =  create_problem(1000)
        L.append(test_input)
        L.append(test_classes)
        M.append(L)
    return M

In [126]:
train_input, train_classes, _, _ = create_problem(1000)
#print(train_input.size())
#print(train_input.narrow(0, b, mini_batch_size).size())
nb_epochs = 50
mini_batch_size = 10

model = Sequential([Linear(2,25), Tanh(), Linear(25,25), Tanh(), Linear(25,1), Tanh()])
train_model(model, train_input, train_classes, nb_epochs, mini_batch_size)

In [127]:
nb_train_errors = compute_nb_errors(model, train_input, train_classes, mini_batch_size)
print('train error {:0.2f}% {:f}/{:f}'.format((100 * nb_train_errors) / train_input.size(0), nb_train_errors, train_classes.size(0)))

L = get_tests(10)
average_nb_test_error = 0
for k in range (0, len(L)):
    nb_test_errors = compute_nb_errors(model, L[k][0], L[k][1], mini_batch_size)
    average_nb_test_error += nb_test_errors
    print('test error {:0.2f}% {:f}/{:f}'.format((100 * nb_test_errors) / L[k][0].size(0), nb_test_errors, L[k][0].size(0)))
print('Average test error {:0.2f}% {:0.1f}/{:d}'.format((100*average_nb_test_error/len(L)) / L[0][0].size(0), average_nb_test_error/len(L), L[0][0].size(0)))

train error 16.80% 168.000000/1000.000000
test error 24.40% 244.000000/1000.000000
test error 26.70% 267.000000/1000.000000
test error 23.70% 237.000000/1000.000000
test error 27.10% 271.000000/1000.000000
test error 24.30% 243.000000/1000.000000
test error 27.60% 276.000000/1000.000000
test error 25.20% 252.000000/1000.000000
test error 25.80% 258.000000/1000.000000
test error 25.40% 254.000000/1000.000000
test error 24.40% 244.000000/1000.000000
Average test error 25.46% 254.6/1000


In [105]:
History_Loss = []
_, _, test_input, test_classes =  create_problem(1000)
for epochs in range(0, nb_epochs):
    Sum_Loss = 0
    for b in range(0, train_input.size(0), 1):
        output = model.forward(train_input.narrow(0, b, 1))
        Loss = model.backward(train_classes.narrow(0, b, 1),output)
        Sum_Loss = Sum_Loss + Loss.item()
    History_Loss.append(Sum_Loss)  
    print("Epoch: {}, Train Error: {:.4f}%, Test Error: {:.4f}%, Loss  {:.4f}".format(epochs+1,compute_nb_errors(model,train_input, train_classes,1)/train_input.size(0)*100,compute_nb_errors(model,test_input, test_classes,1)/test_input.size(0)*100,Sum_Loss))

RuntimeError: mat1 and mat2 shapes cannot be multiplied (2x1 and 100x25)

In [None]:
x = torch.ones(3,1)
y = torch.zeros(3).resize_(x.size())
print(x-y)