In [1]:
from torch import empty, zeros
import math

In [2]:
class Module(object):
    def __init__(self):
        pass
        
    def forward(self, *input_):
        raise NotImplementedError
        
    def backward(self, *gradwrtoutput):
        raise NotImplementedError
        
    def param(self):
        return []
    
    def update(self, lr):
        pass
    
    def zero_grad(self):
        pass


In [3]:
class Linear(Module):
    def __init__(self, N_in, N_out):
        super(Linear, self).__init__()
        self.N_in = N_in
        self.N_out = N_out
        
        self.W = empty((N_out, N_in)).normal_()
        self.b = empty(N_out).normal_()
        
        self.gradW = zeros((N_out, N_in)).normal_()
        self.gradb = zeros(1, N_out).normal_()
        
    def forward(self, *input_):
        # out = W * input + b
        x = input_[0].clone()
        
        self.x = x.clone()
        
        return self.W.mm(x.t()).t() + self.b
        
    def backward(self, *gradwrtoutput):
        # grad_w += input * x^(l-1).t()
        # grad_b += input
        # out = w.t() * input
        # input = grad of activation function, i.e. dl/ds^(l)
        # x^(l-1) = input of the forward pass
        input_ = gradwrtoutput[0].clone()
        
        self.gradW = self.gradW.clone() + input_.t().mm(self.x)
        self.gradb = self.gradb.clone() + input_
        
        return input_.mm(self.W)
        
    def param(self):
        return [(self.W, self.gradW), (self.b.view(1, -1), self.gradb)]
    
    def update(self, lr):
        self.W.sub_(lr * self.gradW)
        self.b.view(1, -1).sub_(lr * self.gradb)
    
    def zero_grad(self):
        self.gradW = zeros((self.N_out, self.N_in))
        self.gradb = zeros(1, self.N_out)

In [4]:
class ReLU(Module):
    def __init__(self):
        super(ReLU, self).__init__()
        
    def forward(self, *input_):
        s = input_[0].clone()
        self.s = s.clone()
        s[s < 0] = 0.
        
        return s
        
    def backward(self, *gradwrtoutput):
        # out = f'(s^(l)) * input
        # s^(l) = input of forward pass
        # input = grad of next layer
        input_ = gradwrtoutput[0].clone()
        
        out = self.s.clone()
        out[out > 0] = 1
        out[out < 0] = 0
        
        
        return out * input_
        
    def param(self):
        return []
    
    def zero_grad(self):
        pass

In [5]:
class Tanh(Module):
    def __init__(self):
        super(Tanh, self).__init__()
        
    def forward(self, *input_):
        s = input_[0]
        self.s = s.clone()
        
        return s.tanh()
        
    def backward(self, *gradwrtoutput):
        # out = f'(s^(l)) * input
        # s^(l) = input of forward pass
        # input = grad of next layer
        input_ = gradwrtoutput[0]
        out = self.s
        out = 1 - out.tanh().pow(2)
        
        return out * input_
        
    def param(self):
        return []
    
    def zero_grad(self):
        pass

In [6]:
class Sequential(Module):
    def __init__(self, *modules):
        super(Sequential, self).__init__()
        self.modules = modules
        
    def forward(self, *input_):
        x = input_[0].clone()
        
        for m in self.modules:
            x = m.forward(x.clone()).clone()
        
        return x
        
    def backward(self, *gradwrtoutput):
        x = gradwrtoutput[0].clone()
        
        for i, m in enumerate(reversed(self.modules)):
            #print("{} : {}".format(i, x))
            x = m.backward(x.clone()).clone()     
        
    def param(self):
        params = []
        
        for m in self.modules:
            for param in m.param():
                params.append(param)
        
        return params

    def update(self, lr):
        for m in self.modules:
            m.update(lr)
    
    def zero_grad(self):
        for m in self.modules:
            m.zero_grad()

In [7]:
"""class Loss: 
    ...
"""
class LossMSE(Module):
    def __init__(self):
        super(LossMSE, self).__init__()
        pass
        
    def forward(self, y, target):
        # out = e^2
        # e = (y - f(x))
        
        self.y = y.clone()
        target_onehot = zeros((target.shape[0], 2)) 
        self.target = target_onehot.scatter_(1, target.view(-1, 1), 1)
        
        e = (self.y - self.target)
        
        
        return e.pow(2).sum()
        
    def backward(self):
        # out = 2 * e
        
        e = (self.y - self.target)
        
        return 2 * e
        
        

In [8]:
class Optimizer:
    def __init__(self):
        pass
    
    def step(self):
        raise NotImplementedError

class SGD(Optimizer):
    def __init__(self, model, lr):
        super(SGD, self).__init__()
        self.model = model
        self.lr = lr
    
    # Train function?
    # def train(..):
    
    def step(self):
        model.update(self.lr)

In [9]:
def generate_disc_set(nb):
    input = empty(nb, 2).uniform_(-1, 1)
    target = input.pow(2).sum(1).sub(2 / math.pi).sign().add(1).div(2).long()
    return input, target

train_input, train_target = generate_disc_set(1000)
test_input, test_target = generate_disc_set(1000)

In [10]:
train_input

tensor([[ 0.9012, -0.3991],
        [ 0.7823, -0.7173],
        [ 0.6446,  0.3661],
        ...,
        [ 0.2330,  0.2137],
        [ 0.9048, -0.5823],
        [ 0.3929, -0.9313]])

In [11]:
def train_model(model, train_input, train_target, mini_batch_size=1):
    criterion = LossMSE()
    optimizer = SGD(model, lr = 1e-4)
    nb_epochs = 50

    for e in range(nb_epochs):
        sum_loss = 0.
        
        for b in range(0, train_input.size(0), mini_batch_size):
            model.zero_grad()
            output = model.forward(train_input.narrow(0, b, mini_batch_size))
            #print(output)
            loss = criterion.forward(output, train_target.narrow(0, b, mini_batch_size))
            
            sum_loss += loss
            
            l_grad = criterion.backward()
            model.backward(l_grad)
            #print(model.param()[0])
            optimizer.step()
            
        #print(model.param()[0])
            
        print("{} iteration: loss={}".format(e, sum_loss))

In [12]:
model = Sequential(Linear(2, 25), ReLU(),
                   Linear(25, 25), ReLU(),
                   Linear(25, 25), ReLU(),
                   Linear(25, 2))


train_model(model, train_input, train_target, 1)

0 iteration: loss=40971.203125
1 iteration: loss=1617.0498046875
2 iteration: loss=1122.658935546875
3 iteration: loss=928.7864379882812
4 iteration: loss=771.3236083984375
5 iteration: loss=678.635009765625
6 iteration: loss=603.58837890625
7 iteration: loss=545.9110107421875
8 iteration: loss=496.76922607421875
9 iteration: loss=457.56561279296875
10 iteration: loss=425.74481201171875
11 iteration: loss=401.3797912597656
12 iteration: loss=381.05694580078125
13 iteration: loss=363.7950439453125
14 iteration: loss=348.4145812988281
15 iteration: loss=336.3956604003906
16 iteration: loss=324.5934753417969
17 iteration: loss=314.39886474609375
18 iteration: loss=304.3576965332031
19 iteration: loss=295.8401184082031
20 iteration: loss=286.9771423339844
21 iteration: loss=279.5616760253906
22 iteration: loss=272.9071350097656
23 iteration: loss=265.51904296875
24 iteration: loss=259.39361572265625
25 iteration: loss=253.3102264404297
26 iteration: loss=247.75823974609375
27 iteration: lo

In [19]:
import torch
def test(model, test_input, test_target):
    num_samples = test_input.size(0)
    prediction = model.forward(test_input)
    predicted_class = torch.argmax(prediction, axis=1)
    accuracy = sum(predicted_class == test_target).float() / num_samples
    return accuracy

In [20]:
test(model, test_input, test_target)

tensor(0.9260)