In [None]:
from torch import LongTensor, FloatTensor
import torch
import math

In [None]:
class Module(object):
                
    
    def forward(self , *x):
        raise NotImplementedError
    
    def backward(self , *gradwrtoutput):
        raise NotImplementedError
    
    def param(self):
        return []
    
    def step(self, eta):
        pass
    
    def grad_zero(self):
        pass

In [None]:
class Linear(Module):
    def __init__(self, in_features, out_features):
        super(Linear, self).__init__()
        self.weight = FloatTensor(in_features, out_features)
        self.bias = FloatTensor(out_features)
        self.reset_parameters()
        self.bias_grad = FloatTensor(self.bias.size()).zero_()
        self.weight_grad = FloatTensor(self.weight.size()).zero_()
        self.previous_x = None
        
        
    def reset_parameters(self):
        std = 1 / math.sqrt(self.weight.size(1))
        self.weight.uniform_(-std, std)
        self.bias.uniform_(-std, std)
    
    def forward(self, x):
        self.previous_x = x
        return x.matmul(self.weight) + self.bias
    
    def backward(self, *gradwrtoutput):
        #print('grad_weight_size: ', self.weight_grad.size())
        #print('grad_bias_size: ', self.bias_grad.size())
        #print('receive_grad: ', gradwrtoutput[0].size())
        #print('previous_x: ', self.previous_x.size())
        self.bias_grad.add_(gradwrtoutput[0].sum(0))
        self.weight_grad.add_(self.previous_x.t().matmul(gradwrtoutput[0]))
        return gradwrtoutput[0].matmul(self.weight.t())
    
    
    def step(self, eta):
        self.weight = self.weight - eta * self.weight_grad
        self.bias = self.bias - eta * self.bias_grad
    
    def grad_zero(self):
        self.bias_grad.zero_()
        self.weight_grad.zero_()
    
    def param(self):
        return [(self.weight, self.weight_grad), (self.bias, self.bias_grad)]
        

In [None]:
class Sequential(Module):
    def __init__(self, *args):
        self.module_array = []
        super(Sequential, self).__init__()
        for arg in args:
            self.module_array.append(arg)
    
    def forward(self, *x):
        input_ = x[0]
        for module in self.module_array:
            
            input_ = module.forward(input_)
        return input_
    
    def backward(self, *gradwrtoutput):
        input_ = gradwrtoutput[0]
        for module in self.module_array[::-1]:
            input_ = module.backward(input_)
            
        return input_
    
    def param(self):
        parameters = []
        for module in self.module_array:
            parameters.extend(module.param())
        return parameters
    
    def step(self, eta):
        for module in self.module_array:
            module.step(eta)
    
    def grad_zero(self):
        for module in self.module_array:
            module.grad_zero()
        

In [None]:
class ReLU(Module):
    def __init__(self):
        super(ReLU, self).__init__()
        self.temp = None
    
    def forward(self, x):
        x[x<=0] = 0
        self.temp = x
        return x
    
    def backward(self, *gradwrtoutput):
        return self.dRelu(self.temp) * gradwrtoutput[0]
    
    def dRelu(self, x):
        x[x>0] = 1
        x[x<0] = 0
        return x
        

class Tanh(Module):
    
    def __init__(self):
        super(Tanh, self).__init__()
        self.temp = None
    
    def forward(self, x):
        self.temp = x
        return torch.tanh(x)
    
    def backward(self, *gradwrtoutput):
        return self.dTanh(self.temp) * gradwrtoutput[0]
        
    def dTanh(self, x):
        return 4 * (x.exp() + x.mul(-1).exp()).pow(-2)
        
    
class MSELoss(Module):
    
    def __init__(self):
        super(MSELoss, self).__init__()
        
    def forward(self, *x):
        return (x[0] - x[1]).pow(2).sum()
    
    def backward(self, *gradwrtoutput):
        return 2 * (gradwrtoutput[1] - gradwrtoutput[0])

In [None]:

def create_target(target):
    first_dim, second_dim = [], []
    for v in target:
        if v == 1:
            first_dim.append(-1)
            second_dim.append(1)
        else:
            first_dim.append(1)
            second_dim.append(-1)
    return LongTensor([first_dim, second_dim]).t()


        

In [None]:
def generate_disc_set(nb):
    input_ = FloatTensor(nb, 2).uniform_(0, 1)
    target =  input_.pow(2).sum(1).sub(1 / (2*math.pi)).sign().add(1).div(2).long()
    return input_, target

In [None]:
def generate(nb):
    input_ = FloatTensor(nb, 2).uniform_(-1,1)
    target = input_[:,0] < 0
    return input_, target

In [None]:
train_input, train_target = generate(1000)
test_input, test_target = generate(1000)
train_target.sum()

In [None]:
def train_model(model, train_input, train_target, mini_batch_size=10, eta=1e-1, nb_epochs=2000):
    mse_loss = MSELoss()

    for e in range(0, nb_epochs):
        sum_loss = 0
        for b in range(0, train_input.size(0), mini_batch_size):
            output = model.forward(train_input.narrow(0, b, mini_batch_size))
            sum_loss +=mse_loss.forward(output, train_target.narrow(0, b, mini_batch_size))
            grad_loss = mse_loss.backward(output, train_target.narrow(0, b, mini_batch_size))
            model.grad_zero()
            model.backward(grad_loss)
            model.step(eta)
        print(sum_loss)

In [None]:
def compute_nb_errors(model, data_input, data_target, mini_batch_size=10):

    nb_data_errors = 0

    for b in range(0, data_input.size(0), mini_batch_size):
        output = model.forward(data_input.narrow(0, b, mini_batch_size))
        _, predicted_classes = output.max(1)
        print(predicted_classes.sum() == 10)
        for k in range(0, mini_batch_size):
            if data_target[b + k] != predicted_classes[k]:
                nb_data_errors = nb_data_errors + 1

    return nb_data_errors

In [None]:
model = Sequential(Linear(2,25),Tanh(), Linear(25,25), Tanh(), Linear(25,25), Tanh(), Linear(25,2), Tanh())
print(create_target(train_target).size())
train_model(model, train_input, create_target(train_target).float())
compute_nb_errors(model, test_input, test_target)

In [None]:
train_target.sum()