In [4]:
import torch
import sys
sys.path.append('../')
from module import Module

In [16]:
import typing

In [5]:
class Relu(Module):
    
    def forward(self, *input):
        
        self.output = input[0]
        
        return self.relu(self.output)
    
    
    def backward(self, *gradwrtoutput):
        
        derivatives = self.relu_p(self.output)
        
        return derivatives * gradwrtoutput[0]
    
    
    def relu(self, x):
        
        return torch.clamp(x, min =0)
    
    def relu_p(self, x):

        x[x>0] = 1
        x[x<=0] = 0

        return x

In [6]:
class Tanh(Module):
    
    def forward(self, *input):
        
        self.output = input[0]
        
        return self.tanh(self.output)
    
    
    def backward(self, *gradwrtoutput):
        
        derivatives = self.tanh_p(self.output)
        
        return derivatives * gradwrtoutput[0]
    
    def tanh(self, to_compute):
        
        numerator = torch.exp(to_compute) - torch.exp(-to_compute)
        denominator = torch.exp(to_compute) + torch.exp(-to_compute)
        
        return numerator/denominator
        
        
    def tanh_p(self, x):

        return (1 - torch.pow(self.tanh(x),2))

In [7]:
class Softmax(Module):
    
    def forward(self, *input):
        
        self.input_from_layer = input[0]
        
        return self.softmax(input_from_layer)
    
    def backward(self, *gradwrtoutput):
        
        derivatives = self.softmax_p(self.input_from_layer)
        
        return derivatives * gradwrtoutput[0]
        
    
    def softmax(self, input_to_compute):
        
        input_to_compute_v = input_to_compute.view(-1,1)
        
        norm_value = input_to_compute_v.max()
        
        stable_input_to_compute_v = input_to_compute_v - norm_value
        
        exponentials = torch.exp(stable_input_to_compute_v)
        
        sum_exponentials = torch.sum(exponentials)
        
        return (exponentials/sum_exponentials).view(-1)
    
    def softmax_p(self, input_to_compute_p):
        
        softmax_res = self.softmax(input_to_compute_p)
        
        diag_softm = torch.diag(softmax_res)
        
        derivative_soft = torch.FloatTensor(diag_softm.shape[0], diag_softm.shape[0])
        
        for i in range((diag_softm.shape[0])):
            for j in range((diag_softm.shape[0])):
                if i == j:
                    derivative_soft[i][j] = softmax_res[i] * (1 - softmax_res[i])
                else:
                    derivative_soft[i][j] = -softmax_res[i] * softmax_res[j]
                    
        return derivative_soft

In [8]:
class Sequential:
    
    def __init__(self, layers):
        
        self.layers = layers
        
    def forward(self, initial_input):
        
        # initial input
        output_single_layer = initial_input
        
        # Iterate through the layers of the network
        # Pass the input to the forward function of the first layer
        # and keep iterating over the layers and passing the output
        # of the layer before to the one after
        for layer in self.layers:
            output_single_layer = layer.forward(output_single_layer)
        
        # The last output of the network
        return output_single_layer
    
    def backward(self, initial_backward_input):
        
        # Starting with the derivative of the loss
        # We backpropagate by calling the backward
        # function of each layer 
        output_single_layer_backward = initial_backward_input
        
        for layer in self.layers[::-1]:
            
            output_single_layer_backward = layer.backward(output_single_layer_backward)
            
        return output_single_layer_backward
    
    def param(self):
        
        parameters_of_each_layer = []
        for layer in self.layers:
            
            parameters_of_each_layer.append(layer.param)
            
        return parameters_of_each_layer
            

In [18]:
import math

class DataGenerator:
    
    def __init__(self, number_of_examples: int, number_of_features: int, batch_size = 2, shuffle= True):
        
        self.number_of_examples = number_of_examples
        self.number_of_features = number_of_features
        self.batch_size = batch_size
        self.shuffle = shuffle
        
        self.radius_circle = 1/math.sqrt(2*math.pi)
        self.center_circle = [0.5,0.5]
    
    def check_target(self, input_example):


        if math.pow(input_example[0] - self.center_circle[0], 2)\
            + math.pow(input_example[1] - self.center_circle[1], 2)\
                < math.pow(self.radius_circle,2):
            return 1
        else:
            return 0


    def generate_data(self):

        self.data = torch.FloatTensor(self.number_of_examples, self.number_of_features).uniform_(0,1)

        self.targets = torch.LongTensor(self.number_of_examples)
        index_targets = torch.arange(0, self.number_of_examples)

        self.targets = index_targets.apply_(lambda i: self.check_target(self.data[i]))

        return self.data, self.targets    
    
    def yield_data(self):
        
        if self.shuffle==True:
            
            shuffled_indexes = torch.randperm(self.number_of_examples)
            self.data_shuffled = self.data[shuffled_indexes]
            self.targets_shuffled = self.targets[shuffled_indexes]
            
            for batch_start in range(0, self.number_of_examples, self.batch_size):
                
                if self.number_of_examples - batch_start >= self.batch_size:
                    yield self.data_shuffled.narrow(0, batch_start, self.batch_size),\
                          self.targets_shuffled.narrow(0, batch_start, self.batch_size)
                else:
                    yield self.data_shuffled.narrow(0, batch_start, self.number_of_examples - batch_start),\
                          self.targets_shuffled.narrow(0, batch_start, self.number_of_examples - batch_start)
        
        else:
            
            for batch_start in range(0, self.number_of_examples, self.batch_size):
                
                if self.number_of_examples - batch_start >= self.batch_size:
                    yield self.data.narrow(0, batch_start, self.batch_size),\
                          self.targets.narrow(0, batch_start, self.batch_size)
                else:
                    yield self.data.narrow(0, batch_start, self.number_of_examples - batch_start),\
                          self.targets.narrow(0, batch_start, self.number_of_examples - batch_start)
        

In [11]:
class Linear(Module):
    
    def __init__(self, in_features, out_features):
        
        # Number of input neurons
        self.in_features = in_features
        # Number of output neurons
        self.out_features = out_features
        
        # Initializing the weights with Xavierâ€™s initialization
        # First generate the weights from a normal distribution with mean 0 and std 1
        # Then multiply the samples by sqrt(1 / (number_of_input_neurons + number_of_output_neurons))
        self.weight = torch.mul(torch.Tensor(out_features, in_features).normal_(mean=0, std=1), \
                                torch.sqrt(torch.FloatTensor([1/ (self.in_features + self.out_features)])))
        
        # Zero bias initialization
        self.bias = torch.Tensor(out_features).zero_()

        
        
    def forward(self, *input):
        
        # Input from the layer
        self.input_from_layer = input[0]
        
        # Calculating the output, which is basically the multiplication
        # of the weights with the input layer and adding the bias
        self.output = torch.mv(self.weights, self.input_from_layer) + self.bias
        
        
        return self.output

    def backward(self, *gradwrtoutput):
        
        raise NotImplementedError

    def param(self):
        return []
    

In [None]:
# initialize the tensors to accumulate the gradients during backprop
# remember to initialize to zero at the beginning of every mini-batch step
self.dl_dw = Tensor(self.weights.size()).zero_()
self.dl_db = Tensor(self.bias.size()).zero_()

In [None]:
def backward(self, *gradwrtoutput):

        # for a linear layer l, the gradwrtoutput will be the grad output
        # from the activation module, that is the product of dsigma(s_{l})
        # and the grad wrt the output of the activation function
        grad_wrt_s_l = gradwrtoutput[0]

        # compute the grad wrt the input of previous layer (x_{l-1})
        grad_wrt_input_prev_layer = self.weights.t().mv(grad_wrt_s_l)

        # compute the grad wrt the weights of this layer
        # accumulate the grad in our specific tensor
        self.dl_dw.add_(grad_wrt_s_l.view(-1, 1).mm(self.input_prec_layer.view(1, -1)))

        # compute grad wrt the bias term
        self.dl_db.add_(grad_wrt_s_l)

        return grad_wrt_input_prev_layer

    def param(self):
        """
        returns pair of tensors: first is a parameter tensor,
        the second is the gradient accumulator for this parameter tensor
        :return:
        """
        return [(self.weights, self.dl_dw), (self.bias, self.dl_db)]

In [12]:
class CrossEntropy:
    
    def cross_entropy_torch(predictions, targets, epsilon=1e-20):
        """
        Computes cross entropy between targets
        and predictions. 
        Input: predictions (N, #ofclasses) FloatTensor
               targets (N, 1) LongTensor        
        Returns: average loss
        """
        # Clamping the predictions between epsilon and 1 - epsilon
        predictions_clamped = predictions.clamp(epsilon, 1-epsilon)
        # Obtaining the probabilities of the target class
        to_compute_loss = predictions_clamped.gather(1, targets.unsqueeze(1))

        # Computing the loss
        log_loss = -torch.log(to_compute_loss)

        # Computing the mean of the loss
        average_ce_loss = torch.mean(log_loss)

        return average_ce_loss
    
    
    def cross_entropy_troch_p(predictions, targets, epsilon=1e-20):
        
        """
        Computes cross entropy derivative between targets
        and predictions. 
        Input: predictions (N, #ofclasses) FloatTensor
               targets (N, 1) LongTensor        
        Returns: derivative
        """
        
        # Number of examples
        numb_ex = targets.shape[0]

        # Clamping the predictions between epsilon and 1 - epsilon
        predictions_clamped = predictions.clamp(epsilon, 1-epsilon)

        # Computing derivative
    
        predictions_clamped[range(predictions_clamped.shape[0]), targets] -= 1

        derivative = predictions_clamped / numb_ex

        return derivative

In [13]:
class Mse:
    
    def mse(self, predictions, targets):
        
        # 1/2n (x - y) ^2
        return (torch.pow(predictions - targets, 2).mean()) / 2

    def mse_p(self, predictions, targets):
        
        # 1/n (x - y)
        return (predictions - targets) / len(predictions)

In [14]:
class StochasticGD:
    
    def __init__(self, parameters_list, learning_rate = 0.01):
        
        self.parameters_list = parameters_list
        self.learning_rate = learning_rate
        
    def update_parameters(self):
        
        raise NotImplementedError

In [36]:
#### Parameters ####
samples = 1000
features = 2
batch = 100
to_shuffle = True
#### Parameters ####

train_class = DataGenerator(number_of_examples= samples, number_of_features= features, batch_size = batch, shuffle = to_shuffle)
test_class = DataGenerator(number_of_examples=samples, number_of_features= features, batch_size = batch, shuffle = to_shuffle)

train_data, train_target = train_class.generate_data()
test_data, test_target = test_class.generate_data()

train_generator = train_class.yield_data()
test_generator = test_class.yield_data()

In [37]:
input_layer = Linear(in_features= features, out_features = 25)
first_hidden_layer = Linear(in_features = 25, out_features = 25)
second_hidden_layer = Linear(in_features= 25, out_features = 25)
output_layer = Linear(in_features= 25, out_features = 2)

In [None]:
batch_size = 10

# generating our data
train_inputs, train_targets = generate_data(1000, 2)
test_inputs, test_targets = generate_data(1000, 2)

# creating our loaders for training and test sets
train_loader = DataLoader(train_inputs, train_targets, batch_size)
test_loader = DataLoader(test_inputs, test_targets, batch_size)

# defining our layers
layers = [Linear(input_dim=train_inputs[0].shape[0], output_dim=25), Relu(),
          Linear(input_dim=25, output_dim=25), Relu(),
          Linear(input_dim=25, output_dim=2), Tanh()]

# creating our model
model = Sequential(layers)

# init our optimizer
optimizer = SGD(model.get_params(), lr=0.01)

# init our trainer
trainer = Trainer(model=model,
                  optimizer=optimizer,
                  epochs=500,
                  loss=LossMSE(),
                  train_loader=train_loader,
                  test_loader=test_loader)
