<a href="https://colab.research.google.com/github/Zhengro/DL-Identification/blob/jaume/SOM_algorithm.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import numpy as np

__author__ = 'Jaume Anguera Peris'

class SOM:

    DEFAULT_NUM_EPOCHS = 200
    DEFAULT_INIT_LEARN_RATE = 0.4

    # Initialization
    def __init__(self, net_dim_x, net_dim_y, num_features,
                 num_epochs=DEFAULT_NUM_EPOCHS,
                 init_learning_rate=DEFAULT_INIT_LEARN_RATE):
        self.net_dimensions = np.array([net_dim_x,net_dim_y])
        self.num_features = num_features
        self.num_epochs = num_epochs
        self.init_learning_rate = init_learning_rate
        self.init_radius = min(self.net_dimensions[0],self.net_dimensions[1])
        self.time_constant = num_epochs / np.log(self.init_radius)
        self.generate_weight_matrix()

    # Functions
    def generate_weight_matrix(self):
        self.net_weights = np.random.random((self.num_features,self.net_dimensions[0],self.net_dimensions[1]))

    def train(self, inputData, verbose=True):

        if verbose is True:
            msg_interval = 10

        for i in range(self.num_epochs):
            radius = self.decay_radius(i)
            learning_rate = self.decay_learning_rate(i)

            if verbose and (i % msg_interval == 0):
                print("Iteration %d out of %d" % (i,self.num_epochs))
                print("Radius = %.2f" % radius)
                print("Learning rate = %.2e\n" % learning_rate)

            bmu_ind = self.predict_cluster(inputData)

            for x in range(self.net_dimensions[0]):
                for y in range(self.net_dimensions[1]):
                    node_position = np.array([x, y])
                    weight_k = self.net_weights[:, x, y].reshape(1, self.num_features)
                    for sample_ind in range(inputData.shape[0]):
                        distance_between_nodes = SOM.euclidean_dist(node_position, bmu_ind[sample_ind])
                        if distance_between_nodes <= radius ** 2:
                            input_vec = inputData[sample_ind,:]
                            step_size = learning_rate * SOM.neighborhood_influence(distance_between_nodes, radius)
                            updated_weight = weight_k + (step_size * (input_vec - weight_k))
                            self.net_weights[:, x, y] = updated_weight.reshape(self.num_features,)


    def predict_cluster(self, inputData, saveData=False, fileName='cluster_ind.txt'):
        cluster_ind = []

        for user in range(inputData.shape[0]):
            input_vec = inputData[user, :]
            cluster_ind.append(self.find_bmu(input_vec))

        if saveData is True:
            np.savetxt(fileName, cluster_ind, fmt='[%d,%d]', delimiter=',')

        return cluster_ind

    def decay_radius(self, iteration):
        return( self.init_radius * np.exp(-iteration/self.time_constant) )

    def decay_learning_rate(self, iteration):
        return ( self.init_learning_rate * np.exp(-iteration/self.num_epochs) )

    @staticmethod
    def euclidean_dist(first_vector, second_vector):
        return ( np.sum((first_vector - second_vector) ** 2) )

    @staticmethod
    def neighborhood_influence(distance, radius):
        return ( np.exp(-distance/(2 * (radius ** 2))) )

    def find_bmu(self, input_vec):
        bmu_ind = np.array([0, 0])
        bmu_init = self.net_weights[:, 0, 0].reshape(1, self.num_features)
        min_dist = self.euclidean_dist(input_vec, bmu_init)

        for x in range(self.net_dimensions[0]):
            for y in range(self.net_dimensions[1]):
                weight_k = self.net_weights[:, x, y].reshape(1, self.num_features)
                distance_between_vectors = SOM.euclidean_dist(input_vec, weight_k)
                if min_dist > distance_between_vectors:
                    min_dist = distance_between_vectors
                    bmu_ind = np.array([x, y])

        return bmu_ind
