# FNN Zalando


In [43]:
import numpy
# scipy.special for the sigmoid function expit()
import scipy.special
# library for plotting arrays
import matplotlib.pyplot
# ensure the plots are inside this notebook, not an external window
%matplotlib inline

In [44]:
# neural network class definition
class neuralNetwork:
    
    
    # initialise the neural network
    def __init__(self, inputnodes, hiddennodes, outputnodes, learningrate):
        # set number of nodes in each input, hidden, output layer
        self.inodes = inputnodes
        self.hnodes = hiddennodes
        self.onodes = outputnodes
        
        # link weight matrices, wih and who
        # weights inside the arrays are w_i_j, where link is from node i to node j in the next layer
        # w11 w21
        # w12 w22 etc 
        self.wih = numpy.random.normal(0.0, pow(self.inodes, -0.5), (self.hnodes, self.inodes))
        self.who = numpy.random.normal(0.0, pow(self.hnodes, -0.5), (self.onodes, self.hnodes))

        # learning rate
        self.lr = learningrate
        
        # activation function is the sigmoid function
        self.activation_function = lambda x: scipy.special.expit(x)
        
        pass

    
    # train the neural network
    def train(self, inputs_list, targets_list):
        # convert inputs list to 2d array
        inputs = numpy.array(inputs_list, ndmin=2).T
        targets = numpy.array(targets_list, ndmin=2).T
        
        # calculate signals into hidden layer
        hidden_inputs = numpy.dot(self.wih, inputs)
        # calculate the signals emerging from hidden layer
        hidden_outputs = self.activation_function(hidden_inputs)
        
        # calculate signals into final output layer
        final_inputs = numpy.dot(self.who, hidden_outputs)
        # calculate the signals emerging from final output layer
        final_outputs = self.activation_function(final_inputs)
        
        # output layer error is the (target - actual)
        output_errors = targets - final_outputs
        # hidden layer error is the output_errors, split by weights, recombined at hidden nodes
        hidden_errors = numpy.dot(self.who.T, output_errors) 
        
        # update the weights for the links between the hidden and output layers
        self.who += self.lr * numpy.dot((output_errors * final_outputs * (1.0 - final_outputs)), numpy.transpose(hidden_outputs))
        
        # update the weights for the links between the input and hidden layers
        self.wih += self.lr * numpy.dot((hidden_errors * hidden_outputs * (1.0 - hidden_outputs)), numpy.transpose(inputs))
        
        pass

    
    # query the neural network
    def query(self, inputs_list):
        # convert inputs list to 2d array
        inputs = numpy.array(inputs_list, ndmin=2).T
        
        # calculate signals into hidden layer
        hidden_inputs = numpy.dot(self.wih, inputs)
        # calculate the signals emerging from hidden layer
        hidden_outputs = self.activation_function(hidden_inputs)
        
        # calculate signals into final output layer
        final_inputs = numpy.dot(self.who, hidden_outputs)
        # calculate the signals emerging from final output layer
        final_outputs = self.activation_function(final_inputs)
        
        return final_outputs

In [45]:
# number of input, hidden and output nodes
input_nodes = 784
hidden_nodes = 100
output_nodes = 1

# learning rate
learning_rate = 0.05

# create instance of neural network
n = neuralNetwork(input_nodes,hidden_nodes,output_nodes, learning_rate)

In [46]:
with open("data/fashion-mnist_train.csv", 'r') as training_data_file:
    training_data_list = training_data_file.readlines()[1:]

In [47]:
# epochs is the number of times the training data set is used for training
epochs = 1

# counter for skipped records
skipped_records = 0

for e in range(epochs):
    # go through all records in the training data set
    for record in training_data_list:
        # split the record by the ',' commas
        all_values = record.split(',')
        # check if all values are present
        if len(all_values) < 2 or '' in all_values:
            skipped_records += 1
            continue
        # scale and shift the inputs
        inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
        # create the target output value (0.01 or 0.99)
        target = numpy.array([0.99 if int(all_values[0]) == 1 else 0.01])


In [48]:
print(f'Skipped records: {skipped_records}/{len(training_data_list)}')

Skipped records: 0/60000


In [49]:
with open("data/fashion-mnist_test.csv", 'r') as test_data_file:
    test_data_list = test_data_file.readlines()[1:]

In [50]:
# test the neural network

# scorecard for how well the network performs, initially empty
scorecard = []

# counter for skipped records
skipped_records = 0

# go through all the records in the test data set
for record in test_data_list:
    # split the record by the ',' commas
    all_values = record.split(',')
    # check if all values are present
    if len(all_values) != 785:  # 1 label + 784 features
        skipped_records += 1
        continue
    # correct answer is first value
    correct_label = int(all_values[0])
    # scale and shift the inputs
    inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
    # query the network
    outputs = n.query(inputs)
    # the output is a single value, so we compare it directly
    predicted_label = 1 if outputs[0] > 0.5 else 0
    # append correct or incorrect to list
    if predicted_label == correct_label:
        # network's answer matches correct answer, add 1 to scorecard
        scorecard.append(1)
    else:
        # network's answer doesn't match correct answer, add 0 to scorecard
        scorecard.append(0)

In [51]:
# calculate the performance score, which is the ratio of correct answers
performance = sum(scorecard) / len(scorecard)
print(f'Performance: {performance}')
print(f'Skipped records: {skipped_records}/{len(test_data_list)}')

Performance: 0.1037
Skipped records: 0/10000


In [52]:
# Erstellen Sie eine Liste von 10 Modellen
models = []
for i in range(10):
    models.append(neuralNetwork(input_nodes, hidden_nodes, output_nodes, learning_rate))

In [53]:
# Trainieren Sie jedes Modell auf die jeweilige Klasse gegen den Rest
for i in range(10):
    for e in range(epochs):
        for record in training_data_list:
            all_values = record.split(',')
            if len(all_values) < 2 or '' in all_values:
                skipped_records += 1
                continue
            inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
            target = numpy.array([0.99 if int(all_values[0]) == i else 0.01])
            models[i].train(inputs, target)

In [54]:
# Testen Sie jedes Bild mit allen 10 Modellen
scorecard = []
for record in test_data_list:
    all_values = record.split(',')
    if len(all_values) != 785:
        skipped_records += 1
        continue
    correct_label = int(all_values[0])
    inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
    outputs = [model.query(inputs) for model in models]
    predicted_labels = [i for i, output in enumerate(outputs) if output[0] > 0.5]
    if correct_label in predicted_labels:
        scorecard.append(1)
    else:
        scorecard.append(0)

In [55]:
# Berechnen Sie die Leistung
performance = sum(scorecard) / len(scorecard)
print(f'Performance: {performance}')
print(f'Skipped records: {skipped_records}/{len(test_data_list)}')

Performance: 0.8076
Skipped records: 0/10000


In [57]:
# Create a list of 10 ensembles, each containing 10 models
ensembles = []
for _ in range(10):
    ensemble = []
    for i in range(10):
        ensemble.append(neuralNetwork(input_nodes, hidden_nodes, output_nodes, learning_rate))
    ensembles.append(ensemble)

In [58]:
# Train each model in each ensemble on the respective class against the rest
for ensemble in ensembles:
    for i in range(10):
        for e in range(epochs):
            for record in training_data_list:
                all_values = record.split(',')
                if len(all_values) < 2 or '' in all_values:
                    skipped_records += 1
                    continue
                inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
                target = numpy.array([0.99 if int(all_values[0]) == i else 0.01])
                ensemble[i].train(inputs, target)

In [60]:
# Test each image with all models in all ensembles
scorecard = []
for record in test_data_list:
    all_values = record.split(',')
    if len(all_values) != 785:
        skipped_records += 1
        continue
    correct_label = int(all_values[0])
    inputs = (numpy.asarray(all_values[1:], dtype=float) / 255.0 * 0.99) + 0.01
    ensemble_outputs = []
    for ensemble in ensembles:
        outputs = [model.query(inputs) for model in ensemble]
        predicted_labels = [i for i, output in enumerate(outputs) if output[0] > 0.5]
        ensemble_outputs.append(predicted_labels)
    # Aggregate predictions from all ensembles
    all_predictions = sum(ensemble_outputs, [])
    if all_predictions:
        final_prediction = max(set(all_predictions), key=all_predictions.count)
        if correct_label == final_prediction:
            scorecard.append(1)
        else:
            scorecard.append(0)
    else:
        # Handle the case where no model predicts a label with confidence > 0.5
        scorecard.append(0)

In [61]:
# Calculate performance
performance = sum(scorecard) / len(scorecard)
print(f'Performance: {performance}')
print(f'Skipped records: {skipped_records}/{len(test_data_list)}')

Performance: 0.8016
Skipped records: 0/10000
