In [1]:
import numpy as np
import math
import random
import itertools
from sklearn.datasets import load_iris
from scipy.spatial import distance
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.datasets import fetch_mldata
import json

In [2]:
def normalize_vector(vector):
    '''
    Returns the normalized vector of the provided one, a length 1
    vector with the same direction as the original.
    '''
    vector = np.array(vector)
    return (vector/math.sqrt(np.sum(pow(vector, 2)))).tolist()

In [3]:
def extended_normalization(vector):
    '''
    When two vectors of different lengths but in the same direction are normalized, 
    the result will be the same for both of them.
    
    In order to avoid collisions of normalized vectors, an extended normalization can
    be applied, where an extra dimension is added to the vector to be normalized. By
    adding a component of length 1 in the new dimension and normalizing the result,
    it is ensured that no two vectors which were originally in the same dimensions
    will produce the same output.
    '''
    vector = np.append(vector, 1)
    return (vector/math.sqrt(np.sum(pow(vector, 2)))).tolist()

In [4]:
def get_random_unit_vector(components):
    '''
    Returns a random vector of length 1 with the required number of components
    '''
    return normalize_vector([random.random() for i in range(components)])

In [5]:
class SOM:
    def __init__ (self, dimensions, topology='rectangular', max_iter=100, 
                  initial_learning_rate=0.0001, initial_neighborhood_radius=0,
                 labels=[], output_layer=[]):
        '''
        dimensions - Array-like, containing the size of the output
        layer in each dimension.
        
        topology - The neighborhood relation between output-layer neurons.
        The default topology is rectangular. A future improvement would be
        to implement a hexagonal topology option.
        
        max_iter - The maximum number of epochs before the algorithm stops
        
        initial_learning_rate - The, you guessed it, initial learning rate
        
        initial_neighborhood_radius - If zero, helf the size of the smallest output layer
        dimension will be used as the initial radius, else, this will be used
        
        labels - If not [], this is used instead of creating a new labels matrix.
        useful for loading previous models
        
        output_layer - If not [], the output layer is set to this
        '''
        self.dimensions = dimensions
        self.output_layer = None if output_layer==[] else output_layer
        self.max_iter = max_iter
        self.topology = topology
        self.initial_learning_rate = initial_learning_rate
        self.initial_neighborhood_radius = initial_neighborhood_radius if\
                                    initial_neighborhood_radius>0 else\
                                    math.floor((np.min(dimensions)-1)/2)
        #Output layer indexes is the cartesian product of the lists containing
        #the possible indexes to each dimension of the output layer matrix. That
        #is, it is a list containing the indexes to all the positions of the 
        #output layer. It is used to iterate over the output layer, since the 
        #number of dimensions it will have is unknown before the execution
        indexes_list = [list(range(i)) for i in self.dimensions]
        self.output_layer_indexes = list(itertools.product(*indexes_list))
        #The labels for each output neuron, meaning that an instance that has 
        #a neuron as its BMU will be classified using the label corresponding to 
        #that neuron. The matrix contains tuples indicating the label and the
        #distance to the closest match there has been for that neuron. The label
        #corresponds to the class of that match. If a closer match is found, the
        #label is updated.
        #Although all labels are initialized to zero, an infinite distance ensures
        #all will be updated when the training begins
        if labels == []:
            self.labels = np.zeros(self.dimensions, dtype=object)
            for i in self.output_layer_indexes:
                self.labels[i]=[0, math.inf]
        else:
            self.labels = labels
        return
    @staticmethod
    def load_model(path):
        '''
        Loads the state of the model from a json file and returns a SOM instance
        '''
        f=open(path, 'r')
        string_data = f.read()
        som_data = json.loads(string_data)
        f.close
        som = SOM(som_data['dimensions'],
                  output_layer = np.array(som_data['output_layer']),
                 labels = np.array(som_data['labels']),
                 max_iter = som_data['max_iter'],
                 topology = som_data['topology'],
                 initial_learning_rate = som_data['initial_learning_rate'],
                 initial_neighborhood_radius = som_data['initial_neighborhood_radius'])
        return som
    
    def save_model(self, path):
        '''
        Saves the state of the model to the file indicated by path
        '''
        f=open(path+".json", 'w')
        state={'dimensions': self.dimensions,
                'output_layer': self.output_layer.tolist(),
                'labels': self.labels.tolist(),
                'max_iter': self.max_iter,
                'topology': self.topology,
                'initial_learning_rate': self.initial_learning_rate,
                'initial_neighborhood_radius': self.initial_neighborhood_radius}
        f.write(json.dumps(state))
        f.close
        return
    def get_labels(self):
        '''
        Returns the matrix with the class associated to each output neuron
        '''
        return self.labels
    def __weight_initialization(self, num_attr):
        '''
        Initializes the output layer to a matrix with the dimensions specified
        for this map, filled with random unit vectors of dimension num_attr
        
        num_attr - The number of attributes of the instances this network will
        be trained with
        '''
        self.output_layer = np.zeros(self.dimensions, dtype=object)
        for i in self.output_layer_indexes:
            self.output_layer[i] = get_random_unit_vector(num_attr)
        #Maybe cast the output layer to a regular list here?
        return
    def __get_output(self, instance):
        '''
        Calculates the output of each neuron for the provided instance.
        Returns a matrix with the same dimensions as the output layer
        containing the outputs
        '''
        output_matrix = np.zeros(self.dimensions)
        for i in self.output_layer_indexes:
            output_matrix[i] = distance.euclidean(instance, self.output_layer[i])
        return output_matrix
    def __update_labels(self, output_matrix, instance_class):
        '''
        Update the labels matrix. If this instance is a closer match for
        a neuron than any previous one, the neuron will be labelled with
        the instace's class and the distance stored
        
        output_matrix - The distances from this instance to each output neuron
        
        instance_class - The class associated to the instance
        '''
        for i in self.output_layer_indexes:
            if output_matrix[i] < self.labels[i][1]:
                self.labels[i][0]=int(instance_class)
                self.labels[i][1]=float(output_matrix[i])
        return
    def __get_neighbors(self, index, radius):
        '''
        Returns a matrix containing the indexes of the neurons in distance radius
        to the one specified by index.
        In order to implement different possible topologies, this should have a different
        way of computing the indexes depending on that
        '''
        indexes_list = [list(range(i-math.ceil(radius/2), i+math.ceil(radius/2)+1)) for i in index]
        #Just like obtaining the indexes into the output layer, the indexes to this
        #area of the output matrix are obtained by computing the cartesian product 
        #of the indexes into each dimension, limiting the indexes to the correct range
        indexes = np.array(list(itertools.product(*indexes_list)))
        #Finally, we divide the indexes in each dimension modulo the size of that dimension,
        #in order to obtain the wrapped around indexes.
        #Indexes are casted to tuples for their use as array indexes
        return [tuple(i) for i in indexes%self.dimensions]
    
    def fit(self, X, Y):
        '''
        Initializes the output layer neurons to random unit vectors, normalizes
        the input vectors, and applies the learning algorithm to the input data, X
        
        X - The data to train the algorithm on
        
        Y - The class of each instance
        '''
        #Since extended normalization will be used, the inputs will have an extra 
        #dimension, which is why I add 1 here so the output layer vectors will have
        #the number of attributes of an instance plus 1 components
        #Weight initialization
        self.__weight_initialization(len(X[0]) + 1)
        #Input data normalization
        X = [extended_normalization(v) for v in X]
        #Training
        for epoch in range(self.max_iter):
            #print("Epoch: " + str(epoch))
            #The neighborhood radius depends on the current epoch
            neighborhood_radius = max(self.initial_neighborhood_radius-epoch, 0)
            #Compute learning rate for this iteration
            learning_rate = self.initial_learning_rate/(1+epoch)
            #In order to chose inputs at random, create an array with the indexes
            #for each instance and reorder it at random
            instance_indexes=list(range(len(X)))
            random.shuffle(instance_indexes)
            
            for instance_index in instance_indexes:
                #Chose a vector at random from the training data
                input_vector = X[instance_index]
                #Get the output for each neuron in the output layer
                output_matrix = self.__get_output(input_vector)
                #Get the index of the neuron that provides the smallest output
                #Np unravel index gives us the multi-dimensional index to a matrix
                #of the specified shape given the index into the flattened version of
                #the matrix, which np.argmin returns
                best_matching_unit = np.unravel_index(np.argmin(output_matrix, axis=None), 
                                                      output_matrix.shape)
                #This would be another way to do it, testing should be done in order 
                #to determine which of the two is the most efficient. I would think this 
                #secong one is probably better as the matrix indexes are already computed
                #best_matching_unit = self.output_layer_indexes[np.argmin(output_matrix)]

                #Obtain the indexes of the neighbors of the best matching unit within a
                #specified radius, including the BMU. Note that the indexes that go out of
                #bounds wrap around the matrix

                #This is an alternative way to compute the radius
                #The neighborhood radius decreases as the training progresses, and is 
                #computed based on the current epoch, the maximum number of epochs
                #and the initial radius (I could find no formula for doing it so I just
                #chose this as it seemed reasonable. However, I assume there are much 
                #better ways of doing it)
                #I will scale the remaining number of iterations to the range
                #[0, initial_radius], and round up the result
                #https://stats.stackexchange.com/questions/281162/scale-a-number-between-a-range
                #current_radius = ((max_epoch-current_epoch)-0/max_epoch-0)*(initial_radius-0)+0
                #neighborhood_radius = ((self.max_iter-epoch)/self.max_iter)\
                #                        *self.initial_neighborhood_radius

                neighbors = self.__get_neighbors(best_matching_unit, neighborhood_radius)

                #I will use a simple neighborhood function: 1 for the BMU's neighbors, and 
                #0 for the rest. More complex options would involve using different values
                #for the BMU's neighbors based on various factors such as their distance
                #to the BMU
                #The function I will use to update the weights makes the assumption that 
                #both the inputs and the output layer weights are normalized, which they
                #are, in this implementation:
                #w_i(t+1) = (w_i(t)+α(t)x_v)/(‖w_i(t)+α(t)x_v‖)
                #Where w_i(t) is the element i of the weights vector of the neuron
                #in iteration t, α(t) is the learning rate for iteration t, and x 
                #is the input vector. The division by its module serves to normalize
                #the vector again
                #As α(t), I will use initial_learning_rate/(1+current_epoch)

                #Update the weights of the BMU and it's neighbors
                for n in neighbors:
                    self.output_layer[n] = normalize_vector(self.output_layer[n]+
                                                            learning_rate*np.array(input_vector))
        #Update the labels matrix. 
        for instance_index in range(len(X)):
            #Get the output for the instance
            output_matrix = self.__get_output(X[instance_index])
            #Update the labels
            self.__update_labels(output_matrix, Y[instance_index])
        return
    def predict(self, X):
        '''
        Determines the Best Matching Unit for each instance in X, and returns the predicted
        class for it based on the class associated to the neuron
        '''
        #Normalize the instances in X
        X = [extended_normalization(v) for v in X]
        predictions = []
        for instance in X:
            #Get the output for each neuron in the output layer
            output_matrix = self.__get_output(instance)
            #Get the index of the BMU
            best_matching_unit = np.unravel_index(np.argmin(output_matrix, axis=None), 
                                                  output_matrix.shape)
            #Get the label associated to that neuron and store it
            predictions.append(self.labels[best_matching_unit][0])
        return predictions
    def print_map():
        '''
        Shows a graphical representation of the output layer
        '''
        return

## Iris

In [64]:
data = load_iris()["data"]
target = load_iris()["target"]
X_train, X_test, y_train, y_test = train_test_split(data, target, test_size=0.33, random_state=0)

In [None]:
som = SOM([12, 8], max_iter=100)
som.fit(X_train, y_train)
som.save_model("iris_neuron_labeling")
prediction = som.predict(X_test)
accuracy_score(y_test, prediction, normalize=True)

## MNIST

### Loading MNIST Dataset

In [122]:
#Training set images load
f = open("train-images-idx3-ubyte", 'rb')
magic_number = int.from_bytes(f.read(4), "big")
images_count = int.from_bytes(f.read(4), "big")
rows = int.from_bytes(f.read(4), "big")
cols = int.from_bytes(f.read(4), "big")
mnist_train_x = []
for image in range(images_count):
    instance = []
    for i in range(rows):
        for j in range(cols):
            instance.append(int.from_bytes(f.read(1), "big", signed=False))
    mnist_train_x.append(instance)

In [125]:
#Training set labels load
f = open("train-labels-idx1-ubyte", 'rb')
magic_number = int.from_bytes(f.read(4), "big")
items_count = int.from_bytes(f.read(4), "big")
mnist_train_y = []
for image in range(items_count):
    mnist_train_y.append(int.from_bytes(f.read(1), "big", signed=False))

In [6]:
#Test set images load
f = open("t10k-images-idx3-ubyte", 'rb')
magic_number = int.from_bytes(f.read(4), "big")
images_count = int.from_bytes(f.read(4), "big")
rows = int.from_bytes(f.read(4), "big")
cols = int.from_bytes(f.read(4), "big")
mnist_test_x = []
for image in range(images_count):
    instance = []
    for i in range(rows):
        for j in range(cols):
            instance.append(int.from_bytes(f.read(1), "big", signed=False))
    mnist_test_x.append(instance)

In [7]:
#Test set labels load
f = open("t10k-labels-idx1-ubyte", 'rb')
magic_number = int.from_bytes(f.read(4), "big")
items_count = int.from_bytes(f.read(4), "big")
mnist_test_y = []
for image in range(items_count):
    mnist_test_y.append(int.from_bytes(f.read(1), "big", signed=False))

### Training 

In [None]:
som = SOM([12, 8], max_iter=100)
som.fit(mnist_train_x, mnist_train_y)
som.save_model("mnist_neuron_labeling")
prediction = som.predict(mnist_test_x)
accuracy_score(mnist_test_y, prediction, normalize=True)

In [15]:
prediction = som.predict(mnist_test_x)

In [16]:
accuracy_score(mnist_test_y, prediction, normalize=True)

0.3215

## MNIST Saved Model

In [14]:
som=SOM.load_model("mnist_neuron_labeling.json")



In [None]:
accuracy_score(mnist_test_y, prediction, normalize=True)