In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import random
import time
from sklearn import preprocessing
import multiprocessing as mp
import os

## Cost/activation

In [2]:
def cost_MSE(t,y_hat, derivative=0):
    if derivative:
            return -(t - y_hat)
    return np.mean(1/2*np.sum(np.power(t - y_hat, 2),\
                      axis=0))

def logistic_sigmoid(x, derivative=0):    
    sigm = 1/(1 + np.exp(-x))
    if len(sigm.shape) < 2:
        sigm = sigm.reshape(sigm.shape[0],1)
        
    if derivative:
        return sigm*(1. - sigm)
    return sigm

## NN backend

In [185]:
# >>>>>>>>>>>>>>>>>>> init_weights_biases >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# W, B = init_weights_biases(4, 3, [2,2])
#no_hidden_units: needs a list with at least one element
def init_weights_biases(no_of_features, no_outputs, no_hidden_units, seed=1):
    
    W = []
    B = []
    rows, columns = 0, 0 
    last = len(no_hidden_units)
    np.random.seed(seed)
    
    if no_hidden_units: #list is not empty
        for i in range(last+1):
            if i == 0: #first weight
                rows = no_hidden_units[i]
                columns = no_of_features
            elif i > 0 and i < last:
                rows = no_hidden_units[i]
                columns = no_hidden_units[i-1]
            else: #last
                columns = rows # list ran out of indeces, so use last one
                rows = no_outputs            

            W.insert(i, np.random.randn(rows, columns))
            B.insert(i, np.zeros((rows, 1)))
    else: # no hidden units (perceptron)
        W.insert(0, np.random.randn(no_outputs, no_of_features))
        B.insert(0, np.zeros((no_outputs, 1)))
    
    dummy_param = 0
    param = 0
    for i in range(len(W)):
        dummy_param = W[i].shape[0] * W[i].shape[1]
        param += dummy_param
        
#     W.append(param) #number of learnable weights
    
    return W, B, param

# >>>>>>>>>>>>>>>>>>> forward_prop >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    # W1, b1, W2, b2 = init_weights_biases(no_hidden_units=8)
    # Z, A, Y = forward_prop(W, B, X)
    # X has n features x M samples
def forward_prop(W, B, X):
    no_of_samples = X.shape[1]
     #last weight matrix, rows correspond to outputs
    no_of_outputs = W[-2].shape[0] #index -1 is the number of learnable weights
    
    Z = []
    A = []
    A.append(X) #first layer is an activation
    
    for i in range(len(W)): #to avoid the last two indeces
        Z.insert(i, W[i] @ A[i] + B[i])
        A.insert(i+1, logistic_sigmoid(Z[i]))
    
    Y = np.zeros((no_of_samples, no_of_outputs))
    #scaling to making the pair a probability
    Y = np.divide(A[i+1], np.sum(A[i+1], axis=0)) #comuns are the samples now
    return Z, A, Y

# >>>>>>>>>>>>>>>>>>> backprop >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
# W1, b1, W2, b2 = init_weights_biases(no_hidden_units=8)
# A1, A2, Y = forward_prop(W1, b1, W2, b2, X)
# grad_mid_layer, grad_output = backprop(W2, A1, A2, X, Y, t)
# backprop(W2, A1, A2, X, Y, t)
def backprop(W, Z, A, Y_hat, T):
    
    output_index = len(W)-1 # if 3, starts at 2
    error = {}
    
    error_output = ( cost_MSE(T,Y_hat, derivative=1) * logistic_sigmoid(Z[-1], derivative=1))
    error[output_index] = error_output
    
    dJ_dW = {}
    for i in range(output_index-1,-1,-1):
         # doesn't get to W[0], so updated after the foor loop again
#         dJ_dW.insert(i+1, error[i+1] @ A[i+1].T)
        dJ_dW[i+1] = error[i+1] @ A[i+1].T
        
        error_dummy = (W[i+1].T @ error[i+1]) * logistic_sigmoid(Z[i], derivative=1)
#         error.insert(i, error_dummy)
        error[i] = error_dummy
    
    dJ_dW[0] = error[0] @ A[0].T
    
    return dJ_dW

## NN frontend 1 

    train, predict functions

In [1]:
# >>>>>>>>>>>>>>>>>>> train >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    # all samples (X), 4 x 2, are fed
    # X: dataset, n samples x N features
    # T: binary labels, n labels x L number of ouputs
    # hidden_layers : list with number of neurons for each inner layer.
        # e.g. [3, 4] will yield two layers with 3 and 4 units respectively
    # this function needs n samples > 1 (batch optimization).
def train(X, T, hidden_layers=[2], epochs=500, rho=.1, normalize_data=True, show_cost=0):    
    
    if normalize_data:
        scaler = preprocessing.StandardScaler()
        X = scaler.fit_transform(X)
    
    no_of_features = X.shape[1]
    no_samples = X.shape[0]
    no_outputs = T.shape[1]
    
    W, B, param = init_weights_biases(no_of_features, no_outputs, hidden_layers)
    
    Y_hat = np.zeros((no_outputs, X.shape[0]))
    
    j = 0
    idx_done = 0
    converged = False
    
    ###NESTED function
    def display_NN_info():
        print("* NN ************************************")
        print("   no. inputs (layer 1): " + str(no_of_features) )
        for k in range(len(hidden_layers)):
            print("   layer " + str(k+2) + ": " + str(hidden_layers[k]) + " units")
        print("   output layer ("+ str(k+3) + "): " + str(no_outputs) )
        print("   learnable weights: " + str(param) )
        print("   max epochs: " + "{:,}".format(epochs) )
        print("   learning rate(rho): " + str(rho) )
        
    display_NN_info()
    time.sleep(4)
    
    cost_final = []
    accuracy = []
    
    for i in range(epochs):
        
        Z, A, Y_hat = forward_prop(W, B, X.T)
        dJ_dW = backprop(W, Z, A, Y_hat, T.T)

        #grad descent
        for j in range(len(W)):
            W[j] = W[j] - rho*dJ_dW[j]

        Y = Y_hat.T

        y_and_T_match = np.allclose(Y, T, rtol=1e-03)        

        if y_and_T_match: #converged
            j += 1 
            if j == 3:
                idx_done = i + 1 # already predicts corretly all the time
            if j > 100: #makes the prediction more robust 
                # ( probability considered 1 == .60 or greater )
                converged = True
                break
    
        cost_final.append(cost_MSE(T, Y_hat.T))
        accuracy.append(calc_accuracy(T, Y_hat.T))
        
        if show_cost:
            print(str(i) + ": accur: "+ str(accuracy[i])+ "%")
            print(" cost: " + str(cost_final[i]))
    
    if show_cost:
        display_NN_info()
        plt.scatter(range(epochs), cost_final, s=1, color="red")
        plt.title("iterations X cost")
        plt.xlabel("iterations")
        plt.ylabel("cost")
        
    return W, B, Y, X, cost_final, epochs, idx_done, converged, rho, normalize_data, accuracy

# >>>>>>>>>>>>>>>>>>> predict >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    # X: dataset, n samples x N features
    #  train_pkg: list with [W, B, Y, X, cost_final, epochs, idx_done, converged, rho]
def predict(X, train_pkg):
    if len(X.shape) < 2:
        X = X.reshape(1,X.shape[0]) #for one sample
    
    normalized = train_pkg[-1]
    if normalized: #if the data has been normalized
        scaler = preprocessing.StandardScaler()
        X = scaler.fit_transform(X)
    
    Z, A, Y_hat = forward_prop(train_pkg[0], train_pkg[1], X.T)
    
    del Z, A
    return Y_hat.T, accuracy

def calc_accuracy(T, Y):
    matches = np.argmax(Y, axis=1) == np.argmax(T, axis=1)
    return len(matches[matches == True])/len(matches)*100

### NN functions 2 (helpers)

In [None]:
# reference for parallel processijng
def func(n):
    for i in range(10000):
        for j in range(10000):
            s=j*i
    print(n)
 
if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(func, range(10))
    pool.close()
    pool.join()   
    print('done')

# >>>>>>>>>>>>>>>>>>> train_all_gates >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def train_all_gates(X, t, hidden_layers=[2],\
                    iterations=500, rho=.01, print_cost=0):
    train_gates = {} #init dictionary

    for i in t:
        # NO_UNITS_L1 = 6  yields max matches with rho = 1 and epochs = 500
#         train_gates[i] : [W1, b1, W2, b2, X, Y, idx_done, epochs, converged, rho]
        train_gates[i] = train(X, t[i], NO_UNITS_L1=no_hidden_units,\
                               epochs=iterations, learning_rate=rho,\
                               show_cost=print_cost)
    
        train_gates[i] = train(X, t[i], hidden_layers=hidden_layers, epochs=epochs, rho=rho, show_cost=show_cost)

    return train_gates

# >>>>>>>>>>>>>>>>>>> match_logic_gate >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def match_logic_gate(train_pkg, T):

    Y = predict(train_pkg)
    prediction_match = np.array_equal(Y, T)
#         train_pkg : [W1, b1, W2, b2, X, Y, idx_done, epochs, converged, rho]
    # indeces used, especially:
    #                                 8. converged
    #                                 6. idx_done, 
    #                                 7. epochs
    #                                 9. rho
    match_pkg = [train_pkg[8], train_pkg[6],\
                  train_pkg[7], train_pkg[9],\
                  prediction_match, Y]
        
    return match_pkg

# >>>>>>>>>>>>>>>>>>> match_all_gate_outputs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def match_all_gate_outputs(train_pkg_all_gates, t):
    matches = {}

    for i in t:
        matches[i] = match_logic_gate(train_pkg_all_gates[i], t[i])
        
    return matches

def print_match(match):
    print(i + " converged: " + str(match[0]))
    print("===========================================")
    print("  iter. to converge: " + str(match[1]))
    print("  iter. max: " + str(match[2]))
    if match[4]:
        print("  ==== CORRECT prediction ==== ")
    else:
        print("  ==== INCORRECT prediction ==== ")
    print("  predicted y (y_hat): ")
    print(match[5])
    print()

#### dataset / targets
X: possible inputs of a logic function.
t: dictionary with possible outputs for each logic gates. 
    4 binary ouputs to match NN's output probabilities of 0 or 1. 
    - if [p(0) p(1)] == [1 0] then probability of 0 == 1 && probability of 1 == 1

In [2]:
X = np.array([[0,0],\
              [0,1],\
              [1,0],\
              [1,1]], dtype=np.float32)

t = { #dictionary for getting both the target logic values and the correlated string 
    # binary labels to represent the probabilities of 1 or 0 (first column is 0, 2nd 1)
    "AND": np.array([[1, 0],\
                     [1, 0],\
                     [1, 0],\
                     [0, 1]], dtype=np.float32),
    
    "NAND": np.array([[0, 1],\
                      [0, 1],\
                      [0, 1],\
                      [1, 0]], dtype=np.float32),
    
    "OR": np.array([[1, 0],\
                    [0, 1],\
                    [0, 1],\
                    [0, 1]], dtype=np.float32),
    
    "NOR": np.array([[0, 1],\
                     [1, 0],\
                     [1, 0],\
                     [1, 0]], dtype=np.float32),
    
    "XOR": np.array([[1, 0],\
                     [0, 1],\
                     [0, 1],\
                     [1, 0]], dtype=np.float32) }

NameError: name 'np' is not defined

## NN run

The Neural Network can be run with train() and then feeding the train_package to predict()

In [None]:
def f(x):
    return x*x

if __name__ == '__main__':
    pool = mp.Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(f, range(10)))

    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print(i)