In [66]:
import numpy as np
import random as rdm
import math
import time

In [67]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def relu(X):
    return np.maximum(0,X)

def sigmoid_prime(x):
    return sigmoid(x)*(1-sigmoid(x))

def relu_prime(X):
    return np.array([1 if x >= 0 else 0 for x in X])

def activation(name):
    if name =='sigmoid':
        return sigmoid,sigmoid_prime
    elif name=='relu':
        return relu,relu_prime
    else:
        raise Exception('activation function not in [sigmoid,relu]')
        
def mean_square_error(y_hat,y):
    return np.square(y_hat - y).mean(axis = 1)

def mean_square_error_der(y_hat,y):
    if len(y) > 1:
        return 2*(y_hat - y).mean(axis = 1)
    else:
        return 2*(y_hat - y)

def square_sum(X):
    return np.square(X).sum()

def loss_function(name):
    if name=='mean_square_error':
        return mean_square_error , mean_square_error_der
    else:
        raise Exception('loss function not in [mean_square_error]')

def penalization(name):
    if name == 'square_sum':
        return square_sum
    else:
        raise Exception('penalization function not in [square_sum]')

In [86]:
class NeuralNetwork:
    def __init__(self,layers_parameters , global_parameters):
        self.layers = {'layer_' + str(c):
                         {'W' : 0.1*np.random.random(size = (layers[l]['output_dim'],layers[l]['input_dim'])),
                          'B' : 0.1*np.random.random(size = (layers[l]['output_dim'],1)),
                          'g' : activation(layers[l]['activation'])[0],
                          'g_prime': activation(layers[l]['activation'])[1],
                          'shape': {'W':(layers[l]['output_dim'],layers[l]['input_dim']),
                                    'B':(layers[l]['output_dim'],1),
                                    'g_prime':(layers[l]['output_dim'],1),
                                    'input':(layers[l]['input_dim'],1),
                                    'output':(layers[l]['output_dim'],1)}
                         } for c,l in enumerate(layers)}
        
        self.input_dim = global_parameters['input']
        self.output_dim = global_parameters['output']
        self.layers_dim = global_parameters['layers']
        self.batch_size = global_parameters['batch']
        self.learning_rate = global_parameters['learning_rate']
        self.train_test_split = parameters['train_test_split']
        
        self.forward_memory = {}
        self.backward_memory= {}
        
        self.loss = loss_function(global_parameters['loss'])[0]
        self.loss_prime = loss_function(global_parameters['loss'])[1]
        
    def predict(self,X):
        #doit gérer les entrées en tableau
        return np.array([self.forward(x) for x in X]).reshape((len(X),self.output_dim))
    
    def loss_calculation(self,x,y):
        y_hat = self.predict(X)
        return self.loss(y_hat,h)
    
    def forward(self,x):
        self.foward_memory = {}
        
        if len(x) != self.input_dim:
            raise Exception('input of size {} instead of {}'.format(len(x),self.input_dim))
            
        x = x.reshape((self.input_dim,1))
        
        for n in range(self.layers_dim):
            name = 'layer_' + str(n)
            z = np.dot(self.layers[name]['W'] , x) + self.layers[name]['B'].reshape(self.layers[name]['shape']['output'])
            h = self.layers[name]['g'](z).reshape(self.layers[name]['shape']['output'])
            x = x.reshape(self.layers[name]['shape']['input'])
            
            self.forward_memory[name] = [x , z , h]
            
            x = np.array(h)
            
        return h
    
    def backward(self , y):
        
        if len(self.forward_memory) != self.layers_dim:
            raise Exception('dimensions do not match')
        
        last_layer = 'layer_' + str(self.layers_dim - 1)
        
        dJ_dh = self.loss_prime(self.forward_memory[last_layer][2] , y.reshape((2,1)))
        dJ_dh.reshape((self.output_dim , 1))
        
        for i in range(self.layers_dim):
            current_layer_name = 'layer_' + str(self.layers_dim - i - 1)
            x , z , h = self.forward_memory[current_layer_name]
            n , m = self.layers[current_layer_name]['shape']['output'] , self.layers[current_layer_name]['shape']['input']
            
            if x.shape != m or z.shape != n or h.shape != n:
                x.reshape(m)
                z.reshape(n)
                h.reshape(n)
            
            dJ_dz = dJ_dh * self.layers[current_layer_name]['g_prime'](z)
            
            if dJ_dz.shape != n:
                dJ_dz = dJ_dz.reshape(n)
                
            dJ_dw = np.dot(dJ_dz,x.T)
            dJ_db = dJ_dz
            
            self.backward_memory[current_layer_name] = {'dJ_dw':dJ_dw,'dJ_db':dJ_db}
            
            dJ_dh = np.dot(self.layers[current_layer_name]['W'].T,dJ_dz)
                        
        return True
    
    def train(self , X , Y , epoch):
        if len(X) != len(Y):
            raise Exception('X and Y dimension do not match')
        
        sc = self.score(X,Y)
        print('initial score: ',sc,end = '\n')
        for l in range(epoch):
            t0 = time.time()
            data = list(zip(X, Y))
            rdm.shuffle(data)
            X, Y = zip(*data)
            X_test = X[:int(self.train_test_split*len(X))]
            Y_test = Y[:int(self.train_test_split*len(X))]
            X_train = X[int(self.train_test_split*len(X)):]
            Y_train = Y[int(self.train_test_split*len(X)):]
            
            n = int(len(X_train)/self.batch_size)
            for i in range(n):
                gradient_update = {}
                for k in range(self.batch_size):
                    current_element = i*self.batch_size + k
                    x , y = X_train[current_element] , Y_train[current_element]
                    temp = self.forward(x)
                    self.backward(y)
                    if k==0:
                        gradient_update = self.backward_memory
                    else:
                        for layer in self.backward_memory:
                            gradient_update[layer]['dJ_dw'] += self.backward_memory[layer]['dJ_dw']
                            gradient_update[layer]['dJ_db'] += self.backward_memory[layer]['dJ_db']
                
                for layer in self.layers:
                    self.layers[layer]['W'] -= self.learning_rate/self.batch_size * gradient_update[layer]['dJ_dw']
                    self.layers[layer]['B'] -= self.learning_rate/self.batch_size * gradient_update[layer]['dJ_db']
            
            
            train_score = self.score(X_train,Y_train)
            test_score = self.score(X_test,Y_test)
            t = time.time() - t0
            results = 'epoch n°{} realized, training score: {}, test score: {}, time: {}'.format(l,train_score,test_score,t)
            print(results, end = '\n')
            
    def score(self,X,Y):
        return np.sqrt(np.square(self.predict(X) - Y).mean(axis = 1).mean(axis = 0))

In [87]:
layers = {
          0:{'input_dim' : 2,
             'output_dim': 3,
             'activation': 'sigmoid'},
          1:{'input_dim' : 3,
             'output_dim': 2,
             'activation': 'relu'}
         }

parameters = {'input': 2,
              'output' : 2,
              'layers': 2,
              'loss': 'mean_square_error',
              'penalization':'square_sum',
              'batch':1,
              'learning_rate':0.1,
              'train_test_split':0.25}

nn = NeuralNetwork(layers , parameters)

In [88]:
# data = []
# for i in range(10000):
#     y = i%2
#     dist = np.random.normal(loc = 1 + i%2,scale = 0.2)
#     x0 = np.random.uniform(low = -np.sqrt(dist/2),high = np.sqrt(dist/2))
#     sign = -1 if np.random.random() > 0.5 else 1
#     x1 = sign*np.sqrt(dist - x0**2)
#     data.append([x0,x1,y])
# X = np.array(data)[:,:2]
# Y = np.array(data)[:,2]

In [89]:
data = []
for i in range(100000):
    x , y = np.random.random() , np.random.random()
    norme = np.square(x ** 2 + y ** 2)
    angle = math.atan(y/x)
    data.append([x,y,norme,angle])
X = np.array(data)[:,:2]
Y = np.array(data)[:,2:]

In [90]:
nn.train(X,Y,epoch = 10)

initial score:  0.8182065318623806
epoch n°0 realized, training score: 0.07846004101615012, test score: 0.07832388071630904, time: 9.090514659881592
epoch n°1 realized, training score: 0.06960240927186599, test score: 0.06967994823824569, time: 8.873120546340942
epoch n°2 realized, training score: 0.06880796115167938, test score: 0.06881606065819056, time: 9.634108543395996
epoch n°3 realized, training score: 0.06942525959070156, test score: 0.06971401119122868, time: 9.184272050857544
epoch n°4 realized, training score: 0.06984652237761488, test score: 0.07006190243953674, time: 9.518407821655273
epoch n°5 realized, training score: 0.06965905396931389, test score: 0.06929108695835487, time: 9.109460592269897
epoch n°6 realized, training score: 0.08548790820498471, test score: 0.08478612315186143, time: 9.125425100326538
epoch n°7 realized, training score: 0.08540611319905687, test score: 0.08515693336144162, time: 9.481502532958984
epoch n°8 realized, training score: 0.068966573352100

In [92]:
X[0],nn.predict([X[0]]),Y[0]

(array([0.41052309, 0.38017066]),
 array([[0.10411922, 0.74336785]]),
 array([0.0980059 , 0.74702991]))

In [13]:
nn.backward(Y[0])

True

In [16]:
X[0],nn.layers

(array([0.98929607, 0.56288807]),
 {'layer_0': {'W': array([[0.00018588, 0.09916613],
          [0.08459164, 0.08656696],
          [0.07856979, 0.00026142]]),
   'B': array([[0.05622521],
          [0.02995821],
          [0.01942281]]),
   'g': <function __main__.sigmoid(x)>,
   'g_prime': <function __main__.sigmoid_prime(x)>,
   'shape': {'W': (3, 2),
    'B': (3, 1),
    'g_prime': (3, 1),
    'input': (2, 1),
    'output': (3, 1)}},
  'layer_1': {'W': array([[0.05346489, 0.09512572, 0.01652686],
          [0.0272123 , 0.02087117, 0.08771289]]),
   'B': array([[0.08309541],
          [0.09143422]]),
   'g': <function __main__.relu(X)>,
   'g_prime': <function __main__.relu_prime(X)>,
   'shape': {'W': (2, 3),
    'B': (2, 1),
    'g_prime': (2, 1),
    'input': (3, 1),
    'output': (2, 1)}}})

In [15]:
nn.forward_memory

{'layer_0': [array([[0.98929607],
         [0.56288807]]),
  array([[0.11222853],
         [0.1623719 ],
         [0.09729874]]),
  array([[0.52802772],
         [0.54050402],
         [0.52430551]])],
 'layer_1': [array([[0.52802772],
         [0.54050402],
         [0.52430551]]),
  array([[0.17140731],
         [0.16307238]]),
  array([[0.17140731],
         [0.16307238]])]}

In [14]:
nn.backward_memory

{'layer_1': [array([[-1.59151958, -1.62912421, -1.58030054],
         [-0.37408094, -0.38291977, -0.37144394]]),
  array([[-3.0140834 ],
         [-0.70844943]])],
 'layer_0': [array([[-0.04448351, -0.02531016],
         [-0.0740796 , -0.04214969],
         [-0.02762335, -0.01571709]]),
  array([[-0.04496481],
         [-0.07488112],
         [-0.02792223]])]}

In [386]:
nn.train(X[:75000],Y[:75000])

74999

0.002075081946097534

In [387]:
nn.score(X[:75000],Y[:75000]),nn.score(X[75000:],Y[75000:])

(0.002075081946097534, 0.0036048963836632646)

In [278]:
np.maximum(0,X)

array([[0.       ],
       [0.5868795]])

In [17]:
np.array([1,1]) + np.array([2,3])

array([3, 4])

In [145]:
z = np.dot(nn.layers[0]['W'] , X[0].reshape(2,1)) + nn.layers[0]['B']
z = z.reshape(nn.layers[0]['shape']['output'])

In [130]:
type(X[0].reshape((2,1)))

numpy.ndarray

In [147]:
nn.layers[0]['W']

array([[array([0.13445301]), array([1.24396206])],
       [array([0.54997813]), array([1.73574006])],
       [array([-1.0244631]), array([0.7470272])]], dtype=object)

In [142]:
nn.layers[0]['g'](np.array([1,2]).reshape(2,1))

array([[0.73105858],
       [0.88079708]])