# Alex Salem
# CS 559: Machine Learning
# Final Project: Implementation of Multilayer Feed-Forward Neural Network with Back-Propagation

# Loading Packages

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn import datasets
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix

# Iris Data Exploration

In [None]:
iris = datasets.load_iris()
X = iris.data[:, :2]  # we only take the first two features.
y = iris.target

plt.figure(2, figsize=(8, 6))
plt.clf()

# Plot the training points
plt.scatter(X[:, 0], X[:, 1], c=y,
            edgecolor='k')
plt.xlabel('Sepal length')
plt.ylabel('Sepal width')

plt.legend()
plt.title("Iris Dataset: Sepal length and width for the three types")
plt.savefig("iris_data.png")

# Single hidden layer, any number of output nodes

Below are helper functions I use.

In [None]:
def append_bias_term(matrix):
    new_col = np.zeros([len(matrix),1])
    new_col+=1
    return np.hstack((new_col, matrix))

def sigmoid(x_vector):
    res = 1/(1+np.exp(-x_vector))
    return res

def predict_class(output_vector):
    class_outputs = []
    for elt in output_vector:
        elt = list(elt)
        class_outputs.append(elt.index(max(elt)))
    return class_outputs

def standardize_matrix_x_data(mat_x):
    copy = mat_x
    list_of_cols = []
    for col in copy.T:
        x_min = np.min(col)
        x_max = np.max(col)
        new_col = (col-x_min)/(x_max-x_min)
        list_of_cols.append(new_col)
    new_mat = np.array(np.array(list_of_cols))
    new_mat = new_mat.T
    return new_mat

def initialize_network(X, y, num_hidden, num_hidden_layers):
    hidden_layers_weights = []
    for i in range(num_hidden_layers):
        if len(hidden_layers_weights)==0:
            hidden_weights = 2*(np.random.random((X.shape[1] + 1, num_hidden)))-1
            hidden_layers_weights.append(hidden_weights)
        else:
            hidden_weights = 2*(np.random.random((num_hidden+1, num_hidden)))-1
            hidden_layers_weights.append(hidden_weights)
    output_weights = 2*(np.random.random((num_hidden + 1, y.shape[1])))-1
    return hidden_layers_weights, output_weights

First 1-layer implementation:

In [None]:
np.random.seed(1)

# learning rate
alpha = .1

# number of nodes in the hidden layer
num_hidden = 3

# inputs
X = iris.data#[:, :2]  # we only take the first two features.
X = standardize_matrix_x_data(X)
iris_target_copy = iris.target
if iris_target_copy[0] == 0:
    new_iris_target = np.array([[1,0,0]])
elif iris_target_copy[0] == 1:
    new_iris_target = np.array([[0,1,0]])
else:
    new_iris_target = np.array([[0,0,1]])
for i in range(1, len(iris_target_copy)):
    if iris_target_copy[i] == 0:
        new_iris_target = np.append(new_iris_target, np.array([[1,0,0]]), axis=0)
    elif iris_target_copy[i] == 1:
        new_iris_target = np.append(new_iris_target, np.array([[0,1,0]]), axis=0)
    else:
        new_iris_target = np.append(new_iris_target, np.array([[0,0,1]]), axis=0)
#y = np.array([iris.target]).T
y = new_iris_target
X_with_bias = append_bias_term(X)

hidden_weights = 2*(np.random.random((X.shape[1] + 1, num_hidden)))-1
output_weights = 2*(np.random.random((num_hidden + 1, y.shape[1])))-1
print("hidden weights")
print(hidden_weights)
print("output_weights")
print(output_weights)

num_iterations = 10000

for i in range (0, num_iterations):
    #forward pass
    #X with bias times hidden initial
    #then sigmoid
    input_matrix = X_with_bias
    hidden_output = append_bias_term(sigmoid(np.matmul(input_matrix, hidden_weights)))
    output_output = sigmoid(np.matmul(hidden_output, output_weights))
    
    #backward pass
    output_error_delta_1_m = (output_output) *(1-output_output)* (output_output - y) 
    hidden_error_delta_j_k = (hidden_output[:, 1:] * (1.0-hidden_output[:, 1:])) * np.matmul(output_error_delta_1_m, output_weights.T[:,1:])
    
    #calculate partial derivatives
    #output_error_pd = np.matmul(hidden_output.T, output_error_delta_1_m)
    #hidden_error_pd = np.matmul(output_output.T, hidden_error_delta_j_k)
    
    hidden_pd = input_matrix[:, :, np.newaxis] * hidden_error_delta_j_k[: , np.newaxis, :]
    output_pd = hidden_output[:, :, np.newaxis] * output_error_delta_1_m[:, np.newaxis, :]
    #hidden_pds = []
    #output_pds = []
    
    #for i in range(0,len(X)):
    #    hidden_pds.append(np.matmul(input_matrix[i][:,np.newaxis], hidden_error_delta_j_k[i][np.newaxis, :]))
    #    output_pds.append(np.matmul(hidden_output[i][:,np.newaxis], output_error_delta_1_m[i][np.newaxis, :]))
        
    #print(hidden_pds)
    #print(output_pds)
    
    #hidden_pd_average = hidden_pds[0]
    #for x in hidden_pds[1:]:
    #    hidden_pd_average+=x
    #hidden_pd_average = hidden_pd_average/len(hidden_pds) 
    
    #output_pd_average = output_pds[0]
    #for x in output_pds[1:]:
    #    output_pd_average+=x
    #output_pd_average = output_pd_average/len(output_pds)
    
    hidden_pd_average = np.average(hidden_pd, axis=0)
    output_pd_average = np.average(output_pd, axis=0)

    #print(output_pd_average)
    #update weights
    hidden_weights += - alpha * hidden_pd_average
    output_weights += - alpha * output_pd_average
    
#print(output_output)

#next need to generalize for many hidden layers and many ouputs
#need to limit to two classes

predict_class(output_output)

# Trying Xor problem with Logistic Regression

In [None]:
X = np.array([
    [-1, 1],
    [1, -1],
    [-1, -1],
    [1, 1]
])

y = np.array([0, 0, 1, 1])
clf = LogisticRegression(solver='liblinear', 
                         max_iter=800).fit(X, y)

In [None]:
clf.predict(X)

# Testing 2-layer by hand

BELOW WORKS DO NOT CHANGE

In [None]:
#Data and seeds
# choose a random seed for reproducible results
np.random.seed(1)

# learning rate
alpha = .1

# number of nodes in the hidden layer
num_hidden = 3

X = iris.data
X = standardize_matrix_x_data(X)
iris_target_copy = iris.target
if iris_target_copy[0] == 0:
    new_iris_target = np.array([[1,0,0]])
elif iris_target_copy[0] == 1:
    new_iris_target = np.array([[0,1,0]])
else:
    new_iris_target = np.array([[0,0,1]])
for i in range(1, len(iris_target_copy)):
    if iris_target_copy[i] == 0:
        new_iris_target = np.append(new_iris_target, np.array([[1,0,0]]), axis=0)
    elif iris_target_copy[i] == 1:
        new_iris_target = np.append(new_iris_target, np.array([[0,1,0]]), axis=0)
    else:
        new_iris_target = np.append(new_iris_target, np.array([[0,0,1]]), axis=0)
#y = np.array([iris.target]).T
y = new_iris_target
X_with_bias = append_bias_term(X)


hidden_weights_0 = 2*(np.random.random((X.shape[1] + 1, num_hidden)))-1
hidden_weights_1 = 2*(np.random.random((num_hidden + 1, num_hidden)))-1
output_weights = 2*(np.random.random((num_hidden + 1, y.shape[1])))-1

num_iterations = 10000

for i in range (0, num_iterations):
    #forward pass
    input_matrix = X_with_bias
    hidden_output_0 = append_bias_term(sigmoid(np.matmul(input_matrix, hidden_weights_0)))
    hidden_output_1 = append_bias_term(sigmoid(np.matmul(hidden_output_0, hidden_weights_1)))
    output_output = sigmoid(np.matmul(hidden_output_1, output_weights))

    #backward pass
    output_error_delta_1_m = (output_output) *(1-output_output)* (output_output - y)
    hidden_error_delta_1_2 = (hidden_output_1[:, 1:] * (1.0-hidden_output_1[:, 1:])) * np.matmul(output_error_delta_1_m, output_weights.T[:,1:])
    hidden_error_delta_0_1 = (hidden_output_0[:, 1:] * (1.0-hidden_output_0[:, 1:])) * np.matmul(hidden_error_delta_1_2, hidden_weights_1.T[:,1:])
    
    #calculate partial derivatives
    hidden_pd_0 = input_matrix[:, :, np.newaxis] * hidden_error_delta_0_1[: , np.newaxis, :]
    hidden_pd_1 = hidden_output_0[:, :, np.newaxis] * hidden_error_delta_1_2[:, np.newaxis, :]
    output_pd = hidden_output_1[:, :, np.newaxis] * output_error_delta_1_m[:, np.newaxis, :]
    
    hidden_pd_0_average = np.average(hidden_pd_0, axis=0)
    hidden_pd_1_average = np.average(hidden_pd_1, axis=0)
    output_pd_average = np.average(output_pd, axis=0)

    #print(output_pd_average)
    #update weights
    hidden_weights_0 += - alpha * hidden_pd_0_average
    hidden_weights_1 += - alpha * hidden_pd_1_average
    output_weights += - alpha * output_pd_average
    
#print(output_output)

#next need to generalize for many hidden layers and many ouputs
#need to limit to two classes

predict_class(output_output)


# Generalized and Final Version

In [None]:
def run_net(X_train_data, X_test_data, y_train_data, y_test_data, num_iterations, hidden_weights, output_weights, alpha):
    MSE_train = []
    MSE_test = []
    for i in range (0, num_iterations):
        #print(i)
        if i!=0 and float(i)% 100 == 0.0:
            #print(i)
            MSE_train_ex = mean_squared_error(y_train_data, output_output)
            #print(MSE_train_ex)
            MSE_train.append(MSE_train_ex)
            input_matrix = append_bias_term(X_test_data)
            hidden_outputs = []
            for i in range(0, len(hidden_weights)):
                if len(hidden_outputs) == 0:
                    prev = input_matrix
                else:
                    prev = hidden_outputs[-1]
                hidden_output = append_bias_term(sigmoid(np.matmul(prev, hidden_weights[i])))
                hidden_outputs.append(hidden_output)
            output_output = sigmoid(np.matmul(hidden_outputs[-1], output_weights))
            MSE_test_ex = mean_squared_error(y_test_data, output_output)
            MSE_test.append(MSE_test_ex)
        #forward pass general
        input_matrix = append_bias_term(X_train_data)
        hidden_outputs = []
        for i in range(0, len(hidden_weights)):
            if len(hidden_outputs) == 0:
                prev = input_matrix
            else:
                prev = hidden_outputs[-1]
            hidden_output = append_bias_term(sigmoid(np.matmul(prev, hidden_weights[i])))
            hidden_outputs.append(hidden_output)
        output_output = sigmoid(np.matmul(hidden_outputs[-1], output_weights))

        #backward pass general
        output_error_delta_1_m = (output_output) *(1-output_output)* (output_output - y_train_data)
        hidden_error_deltas = []
        for i in list(reversed(range(0, len(hidden_outputs)))):
            hidden_output_now = hidden_outputs[i]
            if i == len(hidden_outputs)-1:
                prev_error = output_error_delta_1_m
                prev_weights = output_weights
                hidden_error_delta_j_k = (hidden_output_now[:, 1:] * (1.0-hidden_output_now[:, 1:])) * np.matmul(prev_error, prev_weights[1:, :].T)
            else:
                prev_error = hidden_error_deltas[-1]
                prev_weights = hidden_weights[i+1]
                hidden_error_delta_j_k = (hidden_output_now[:, 1:] * (1.0-hidden_output_now[:, 1:])) * np.matmul(prev_error, prev_weights[1:,:].T)        
            hidden_error_deltas.append(hidden_error_delta_j_k)

        hidden_error_deltas_correct_order = list(reversed(hidden_error_deltas))

        #calculate partial derivatives general
        hidden_pds = []
        hidden_pd_now = input_matrix[:, :, np.newaxis] * hidden_error_deltas_correct_order[0][: , np.newaxis, :]
        hidden_pds.append(hidden_pd_now)
        for i in range(0, len(hidden_outputs)-1):
            prev_output = hidden_outputs[i][:, :, np.newaxis]
            prev_error = hidden_error_deltas_correct_order[i+1][: , np.newaxis, :]
            hidden_pd_now = prev_output * prev_error
            hidden_pds.append(hidden_pd_now)
        output_pd = hidden_outputs[-1][:, :, np.newaxis] * output_error_delta_1_m[:, np.newaxis, :]

        #average partial derivatives:
        average_hidden_pds = []
        for hidden_pd_now in hidden_pds:
            average_hidden_pd = np.average(hidden_pd_now, axis=0)
            average_hidden_pds.append(average_hidden_pd)
        average_output_pd = np.average(output_pd, axis = 0)

        #update weights general
        for i in range(len(hidden_weights)):
            hidden_weights[i] += - alpha * average_hidden_pds[i]
        output_weights += - alpha * average_output_pd

    train_output = output_output
    #forward pass general
    input_matrix = append_bias_term(X_test_data)
    hidden_outputs = []
    for i in range(0, len(hidden_weights)):
        if len(hidden_outputs) == 0:
            prev = input_matrix
        else:
            prev = hidden_outputs[-1]
        hidden_output = append_bias_term(sigmoid(np.matmul(prev, hidden_weights[i])))
        hidden_outputs.append(hidden_output)
    output_output = sigmoid(np.matmul(hidden_outputs[-1], output_weights))
    test_output = output_output
                                                                      
    return train_output, test_output, MSE_train, MSE_test
                                                                    

# Testing with Iris

In [None]:
np.random.seed(1)
X = iris.data
y = iris.target
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2)

new_X_train = standardize_matrix_x_data(X_train)
new_X_test = standardize_matrix_x_data(X_test)

iris_target_copy = y_train
if iris_target_copy[0] == 0:
    new_iris_target = np.array([[1,0,0]])
elif iris_target_copy[0] == 1:
    new_iris_target = np.array([[0,1,0]])
else:
    new_iris_target = np.array([[0,0,1]])
for i in range(1, len(iris_target_copy)):
    if iris_target_copy[i] == 0:
        new_iris_target = np.append(new_iris_target, np.array([[1,0,0]]), axis=0)
    elif iris_target_copy[i] == 1:
        new_iris_target = np.append(new_iris_target, np.array([[0,1,0]]), axis=0)
    else:
        new_iris_target = np.append(new_iris_target, np.array([[0,0,1]]), axis=0)
new_y_train = new_iris_target


iris_target_copy = y_test
if iris_target_copy[0] == 0:
    new_iris_target = np.array([[1,0,0]])
elif iris_target_copy[0] == 1:
    new_iris_target = np.array([[0,1,0]])
else:
    new_iris_target = np.array([[0,0,1]])
for i in range(1, len(iris_target_copy)):
    if iris_target_copy[i] == 0:
        new_iris_target = np.append(new_iris_target, np.array([[1,0,0]]), axis=0)
    elif iris_target_copy[i] == 1:
        new_iris_target = np.append(new_iris_target, np.array([[0,1,0]]), axis=0)
    else:
        new_iris_target = np.append(new_iris_target, np.array([[0,0,1]]), axis=0)
#y = np.array([iris.target]).T
new_y_test = new_iris_target

In [None]:
np.random.seed(1)
hidden_weights, output_weights = initialize_network(new_X_train, new_y_train, 3, 1)
output_train, output_test, MSE_train_res, MSE_test_res = run_net(new_X_train, new_X_test, new_y_train, new_y_test, 6000, hidden_weights, output_weights, .3)

In [None]:
plt.plot(MSE_train_res, 'ro', label = "MSE train")
plt.plot(MSE_test_res, 'bx', label = "MSE test")
plt.xlabel("Iterations/100")
plt.ylabel("MSE")
plt.title("MSE for 2-layer NN on Iris Data")
plt.legend()
#plt.savefig("2_layer_iris.png")

In [None]:
y_train_predicted = predict_class(output_train)
y_test_predicted = predict_class(output_test)

Confusion matrix for 2-layer NN, training data

In [None]:
confusion_matrix(y_train_predicted, 
                 list(y_train))

Confusion matrix for 2-layer NN, test data

In [None]:
confusion_matrix(y_test_predicted, 
                 list(y_test))

# Testing with XOR

In [None]:
X = np.array([
    [-1, 1],
    [1, -1],
    [-1, -1],
    [1, 1]
])

y = np.array([[1,0], 
              [1,0], 
              [0,1],
              [0,1]])

hidden_weights, output_weights = initialize_network(X, y, 3, 2)
output_train, output_test, MSE_train_res, MSE_test_res = run_net(X, y, X, y, 
                                                                 10000, hidden_weights, output_weights, .2)

In [None]:
predict_class(output_test)

In [None]:
X = np.array([
    [-1, 1],
    [1, -1],
    [-1, -1],
    [1, 1]
])

y = np.array([0, 0, 1, 1])
clf = LogisticRegression(random_state=0, 
                         solver="lbfgs").fit(X, y)
clf.predict(X)

In [None]:
np.random.seed(1)
X = iris.data
y = iris.target
X_train,X_test,y_train,y_test=train_test_split(X,y,test_size=0.2)

new_X_train = standardize_matrix_x_data(X_train)
new_X_test = standardize_matrix_x_data(X_test)

clf = LogisticRegression(random_state=0, solver="lbfgs", 
                         multi_class='auto', max_iter=200).fit(X_train, y_train)
y_test_predicted = clf.predict(X_test)

In [None]:
confusion_matrix(y_test_predicted, list(y_test))