In [16]:
from torch import empty
import math

In [17]:
empty(10,10).shape

torch.Size([10, 10])

In [82]:
#I dont know why we need this?
class Module ( object ):
    
    def forward (self , input_ ):
        return input_
    def backward (self, grad):
        
        #Call backward() for previous module
        if self.prev_module is not None:
            prev_grads = self.prev_module.backward(grad)
            
    def param ( self ):
        return []

In [83]:
class LossMSE(Module):
    
    def __init__(self, prev_module = None):

        self.prev_module =  prev_module
        
    def set_truth(self,y_true):
        self.y_true = y_true
        
    def forward (self , input_ ):
        assert input_.shape[1] == self.y_true.shape[1], "Input and output size must match!"
        self.curr_input = input_
        return (self.y_true-input_).square().mean()
        
    def backward (self):
        #Calculate gradient
        grad = -2 *(self.y_true-self.curr_input) / (self.curr_input.shape[1])        
        
        #Call backward() for previous module
        if self.prev_module is not None:
            prev_grads = self.prev_module.backward(grad)
    
    def param ( self ):
        return []
    

In [84]:
class ReLU(Module):
    
    def __init__(self, prev_module = None):
        self.prev_module =  prev_module
        self.curr_grad = 0 #Temporary

    def forward (self , input_ ):
        self.curr_grad = (input_ > 0)
        return input_ * self.curr_grad
        
    def backward (self , gradwrtoutput):
        #Calculate gradient
        grad = self.curr_grad * gradwrtoutput
        
        #Call backward() for previous module
        if self.prev_module is not None:
            prev_grads = self.prev_module.backward(grad)
    
    def param ( self ):
        return []
    
class Tanh(Module):
    
    def __init__(self, prev_module = None):
        self.prev_module =  prev_module
        self.curr_grad = 0 #Temporary

    def forward (self , input_ ):
        self.curr_grad = input_.tanh().power(2).multiply(-1).sum(1)
        return input_.tanh()
        
    def backward (self , gradwrtoutput):
        #Calculate gradient
        grad = self.curr_grad * gradwrtoutput
        
        #Call backward() for previous module
        if self.prev_module is not None:
            prev_grads = self.prev_module.backward(grad)
    
    def param ( self ):
        return []

In [227]:
class FCC(Module):
    
    def __init__(self, input_size, output_size, prev_module = None, lr=1e-1, N = None):
        self.input_size = input_size
        self.output_size = output_size
        self.prev_module =  prev_module
        # Xavier initialization
        self.weights = empty(input_size, output_size).normal_(0, math.sqrt(2/(input_size + output_size)))
        self.bias = empty(1,output_size).fill_(0.1) #TODO better init
        self.curr_input = 0
        self.lr = lr

    def forward (self , input_ ):
        assert input_.shape[1] == self.input_size, "Input size must match!" 
        out = input_ @ (self.weights) 
        out += self.bias
        print(input_.size(), self.weights.size(), out.size(), 'FCC forward')
        assert out.shape[1] == self.output_size, "Output size must match!" 
        self.curr_input = input_
        return out
        
    def backward (self , gradwrtoutput):
        #Calculate gradient
        grad = gradwrtoutput.matmul(self.weights.transpose(1, 0))
        
        #update weights
#         self.update(gradwrtoutput, self.lr)
        self.update(grad, self.lr)
        
        #Call backward() for previous module
        if self.prev_module is not None:
            prev_grads = self.prev_module.backward(grad)
    
    def update(self, gradwrtoutput, learning_rate):
        print('Grad wrt output, weights, curr_input.T, ' )
        print(gradwrtoutput.size(), self.weights.size(), self.curr_input.T.size(), 'FCC backward')
        self.weights -= learning_rate * ( gradwrtoutput.multiply(self.curr_input.T) )
        self.bias -= learning_rate * gradwrtoutput
        
    def param ( self ):
        return [self.weights, self.bias]
    

In [228]:
class NN_builder():
    
    def __init__(self):
        self.layers = []
        layer0 = FCC(2,10)
        self.layers.append(layer0)
        layer1 = ReLU(layer0)
        self.layers.append(layer1)
        layer2 = FCC(10,1, prev_module = layer1)
        self.layers.append(layer2)
        layer3 = ReLU(layer2)
        self.layers.append(layer3)
        layer4 = LossMSE(layer3)
        self.layers.append(layer4)
    
    def model_train(self,input_, g_truth):
        curr = input_
        self.layers[-1].set_truth(g_truth)
        for layer in self.layers:
            curr = layer.forward(curr)
        self.layers[-1].backward()        
        
    def model_eval(self,input_):
        curr = input_
        for layer in self.layers[:-1]:
            curr = layer.forward(curr)
            #print(curr)
        return curr
    

In [229]:
class Sequential():
    
    def __init__(self, layer_list, arguments, loss='MSE', lr = 1e-1):
        self.layers = []
        last_layer = None
        for idx ,layer_name in enumerate(layer_list):
            if(layer_name == 'FCC'):
                assert arguments[idx] != [], "FCC requires a tuple as input!"
                curr_layer =FCC(arguments[idx][0], arguments[idx][1], last_layer, lr=lr)
                self.layers.append(curr_layer)
                last_layer = curr_layer
            elif(layer_name == 'ReLU'):
                assert arguments[idx] == [], "Relu requires no input!"
                curr_layer = ReLU(last_layer)
                self.layers.append(curr_layer)
                last_layer = curr_layer
            elif(layer_name == 'Tanh'):
                assert arguments[idx] == [], "Tanh requires no input!"
                curr_layer = Tanh(last_layer)
                self.layers.append(curr_layer)
                last_layer = curr_layer
            else:
                raise Exception("No Module matches the input")

        if loss == 'MSE':
            curr_layer = LossMSE(last_layer)
            self.layers.append(curr_layer)
        else:
            raise Exception("No Loss matches the input")
                
    def train(self,input_, g_truth):
        out = input_
        self.layers[-1].set_truth(g_truth)
        for layer in self.layers[:-1]:
            out = layer.forward(out)
        loss = self.layers[-1].forward(out)
        self.layers[-1].backward()  
        return out,loss
        
    def eval(self,input_):
        out = input_
        for layer in self.layers[:-1]:
            out = layer.forward(out)
        return out

In [230]:
test = empty(1,2).fill_(5)
truth = empty(1,1).fill_(10)

In [231]:
truth

tensor([[10.]])

In [232]:
builder = NN_builder()

In [233]:
out = builder.model_eval(test)
# print(out)
out

torch.Size([1, 2]) torch.Size([2, 10]) torch.Size([1, 10]) FCC forward
torch.Size([1, 10]) torch.Size([10, 1]) torch.Size([1, 1]) FCC forward


tensor([[2.3078]])

### These two are just for a quick check I know they are terrible :D

In [234]:
def stupid_test_function(a,b):
    if a == 0 or b == 0:
        return 0
    else:
        return 1

In [235]:
def stupid_acc_func(pred,true):
    pred = pred.item() > 0.9
    return (pred == true.item())
    

In [236]:
seq = Sequential(["FCC","ReLU","FCC","ReLU"],[[2,10],[], [10,1], []],"MSE")
train_input = empty(100, 2).uniform_(0,1)
train_target = train_input.add(-0.5).pow(2).sum(1).sub(1 / (2*math.pi)).multiply(-1).sign().add(1).div(2)

test_input = empty(100, 2).uniform_(0,1)
test_target = test_input.add(-0.5).pow(2).sum(1).sub(1 / (2*math.pi)).multiply(-1).sign().add(1).div(2)


minibatch = 100
for i in range(0, train_input.size(0), minibatch):
    out,_ = seq.train(train_input.narrow(0, i, minibatch), truth)
    
test2 = empty(100,2).random_(0,2)
acc = 0
count = 0
for i in range(100):
    truth = empty(1,1).fill_(stupid_test_function(test2[i,0],test2[i,1]))
    inp = test2[i,:].unsqueeze(0)
    out = seq.eval(inp)
    if out.item() != 0:
        count = count + 1
    if stupid_acc_func(out,truth):
        acc = acc + 1
acc /100

torch.Size([100, 2]) torch.Size([2, 10]) torch.Size([100, 10]) FCC forward
torch.Size([100, 10]) torch.Size([10, 1]) torch.Size([100, 1]) FCC forward
Grad wrt output, weights, curr_input.T, 
torch.Size([100, 10]) torch.Size([10, 1]) torch.Size([10, 100]) FCC backward


RuntimeError: The size of tensor a (10) must match the size of tensor b (100) at non-singleton dimension 1

In [237]:
builder = NN_builder()
test = empty(1000,2).random_(0,2)
for i in range(1000):
    truth = empty(1,1).fill_(stupid_test_function(test[i,0],test[i,1]))
    inp = test[i,:].unsqueeze(0)
    out = builder.model_train(inp,truth)
    
test2 = empty(100,2).random_(0,2)
acc = 0
count = 0
for i in range(100):
    truth = empty(1,1).fill_(stupid_test_function(test2[i,0],test2[i,1]))
    inp = test2[i,:].unsqueeze(0)
    out = builder.model_eval(inp)
    if out.item() != 0:
        count = count + 1
    if stupid_acc_func(out,truth):
        acc = acc + 1
acc /100