# Implementation

## Github
Since our code depends largely on PySOMVis and the datasets, we decided to fork the repo:
https://github.com/Brandl/PySOMVis

All our changes remain under the Apache License 2.0

## Here is our implementation of g) Mnemonic SOM: 
Reading a black/white silhouette image to determine the shape of the SOM;
define max extension for units x and y and fit into silhouette; computing an (efficient) look-up
table for shortest distance between two units for the training process, and mapping it into a
regular SOM data structure to support the standard visualizations

In [None]:
#HitHistogram
def HitHist(_m, _n, _weights, _idata):
    hist = np.zeros(_m * _n)
    for vector in _idata:
        position =np.argmin(np.sqrt(np.sum(np.power(_weights - vector, 2), axis=1)))
        hist[position] += 1

    return hist.reshape(_m, _n)

#U-Matrix - implementation
def UMatrix(_m, _n, _weights, _dim):
    U = _weights.reshape(_m, _n, _dim)
    U = np.insert(U, np.arange(1, _n), values=0, axis=1)
    U = np.insert(U, np.arange(1, _m), values=0, axis=0)
    #calculate interpolation
    for i in range(U.shape[0]):
        if i%2==0:
            for j in range(1,U.shape[1],2):
                U[i,j][0] = np.linalg.norm(U[i,j-1] - U[i,j+1], axis=-1)
        else:
            for j in range(U.shape[1]):
                if j%2==0:
                    U[i,j][0] = np.linalg.norm(U[i-1,j] - U[i+1,j], axis=-1)
                else:
                    U[i,j][0] = (np.linalg.norm(U[i-1,j-1] - U[i+1,j+1], axis=-1) + np.linalg.norm(U[i+1,j-1] - U[i-1,j+1], axis=-1))/(2*np.sqrt(2))

    U = np.sum(U, axis=2) #move from Vector to Scalar

    for i in range(0, U.shape[0], 2): #count new values
        for j in range(0, U.shape[1], 2):
            region = []
            if j>0: region.append(U[i][j-1]) #check left border
            if i>0: region.append(U[i-1][j]) #check bottom
            if j<U.shape[1]-1: region.append(U[i][j+1]) #check right border
            if i<U.shape[0]-1: region.append(U[i+1][j]) #check upper border

            U[i,j] = np.median(region)

    return U

#SDH - implementation
def SDH(_m, _n, _weights, _idata, factor, approach):
    import heapq

    sdh_m = np.zeros( _m * _n)

    cs=0
    for i in range(factor): cs += factor-i

    for vector in _idata:
        dist = np.sqrt(np.sum(np.power(_weights - vector, 2), axis=1))
        c = heapq.nsmallest(factor, range(len(dist)), key=dist.__getitem__)
        if (approach==0): # normalized
            for j in range(factor):  sdh_m[c[j]] += (factor-j)/cs
        if (approach==1):# based on distance
            for j in range(factor): sdh_m[c[j]] += 1.0/dist[c[j]]
        if (approach==2):
            dmin, dmax = min(dist[c]), max(dist[c])
            for j in range(factor): sdh_m[c[j]] += 1.0 - (dist[c[j]]-dmin)/(dmax-dmin)

    return sdh_m.reshape(_m, _n)

In [2]:
# SOMUnit implementation for SOM
# Distance data structure is now within each SOM unit so each unit knows the distances to other units.
# The chosen data structure is a map/dict to enable fast lockup. As key, the hash value of a unit is used while the values are the distance between self and the target unit.

import math

def is_out_of_bounds(array, x, y):
    return x < 0 or y < 0 or array.shape[1] <= x or array.shape[0] <= y

class SOMUnit:

    def __init__(self, x, y, input_length, initial_learning_rate = .1, learning_rate_decay = 10, initial_radius = 1, radius_decay = 10):
        self.weights = np.random.rand(input_length)
        self.x = x
        self.y = y

        self.initial_radius = initial_radius
        self.radius_decay = (radius_decay if radius_decay != 0 else 1)
        self.learning_rate_decay = (learning_rate_decay if learning_rate_decay != 0 else 1)
        self.initial_learning_rate = initial_learning_rate
        self.unit_distances = {}

    def __hash__(self):
        return int(str(self.x) + str(self.y))

    # distances are calculated via a flooding algorithm which adds each unit to the distance data structure
    def calculate_unit_distances(self, units):
        visited_units = units == np.array(None)
        next_active_units = [self]
        visited_units[self.y, self.x] = True

        distance = 0

        # search as long as there are unvisited units
        while len(next_active_units) > 0:
            active_units = next_active_units
            next_active_units = []

            # active_units hold the units which are one step further away in comparison to the last iteration.
            while len(active_units) > 0:
                current_unit = active_units.pop(0)
                x = current_unit.x
                y = current_unit.y

                self.unit_distances[hash(current_unit)] = distance

                # expand in von-neumann-neighbourhood
                if not is_out_of_bounds(visited_units, x-1, y) and not visited_units[y, x-1]:
                    next_active_units.append(units[y, x-1])
                    visited_units[y, x-1] = True
                if not is_out_of_bounds(visited_units, x+1, y) and not visited_units[y, x+1]:
                    next_active_units.append(units[y, x+1])
                    visited_units[y, x+1] = True
                if not is_out_of_bounds(visited_units, x, y-1) and not visited_units[y-1, x]:
                    next_active_units.append(units[y-1, x])
                    visited_units[y-1, x] = True
                if not is_out_of_bounds(visited_units, x, y+1) and not visited_units[y+1, x]:
                    next_active_units.append(units[y+1, x])
                    visited_units[y+1, x] = True

            # after working through all units with a given distance, go to the next "layer" of units
            distance += 1

    def calculate_input_distance(self, input_vector):
        return math.sqrt(np.sum((self.weights-input_vector)**2))

    def adjust_weights(self, input_vector, epoch, best_unit):
        self.weights += self._learning_rate(epoch) * self._topological_distance(epoch, best_unit) * (input_vector - self.weights)

    def _topological_distance(self, epoch, best_unit):
        return math.exp(-(self.unit_distances[hash(best_unit)]**2)/(2 * self._neighborhood_size(epoch) **2))

    def _neighborhood_size(self, epoch):
        return self.initial_radius * math.exp(-(epoch)/(self.radius_decay))

    def _learning_rate(self, epoch):
        return self.initial_learning_rate * math.exp(-(epoch)/(self.learning_rate_decay))

    def get_weights(self):
        return self.weights

In [3]:
# Mnemonic SOM implementation
# In comparison to the SOMToolbox, no active_unit_matrix is used which reduces the number of loops.
# Instead, the calculation of distances is moved to the SOMUnit object and only the 2D array for the units are needed.

import numpy as np
import random as rd
from PIL import Image

def _convert_to_binary(image):
    return np.amax(image, axis=2)//255

# checks, whether the given position is active in the image or if it should be ignored in the MnemonicSOM training
def is_active_unit(image, x, y, width, height):
    x0 = min(x, image.shape[1])
    y0 = min(y, image.shape[0])
    x1 = min(x + width, image.shape[1])
    y1 = min(y + height, image.shape[0])

    return (np.sum(image[y0:y1, x0:x1])/image[y0:y1, x0:x1].size) > 0.5


class MnemonicSOM:

    def __init__(self, width, height, input_data, siluette_path, epochs = 10, initial_learning_rate = .1, learning_rate_decay = 10, initial_radius = 1, radius_decay = 10):
        self.width = width
        self.height = height
        self.input_data = input_data
        self.siluette_path = siluette_path
        self.epochs = epochs
        self.initial_learning_rate = initial_learning_rate
        self.learning_rate_decay = learning_rate_decay
        self.initial_radius = initial_radius
        self.radius_decay = radius_decay

        self._load_siluette()

    # load image and call construction of unit array
    def _load_siluette(self):
        img = Image.open(self.siluette_path)
        img = _convert_to_binary(np.asarray(img))

        self._convert_to_units(img)

    # construct an 2D array of units depending on the given binary image.
    def _convert_to_units(self, image):
        unit_width = image.shape[1]/self.width
        unit_height = image.shape[0]/self.height

        # Initialize all cells of the array with None-objects
        self.units = np.empty([self.width, self.height], dtype=np.dtype(object))

        # add a new unit for each active position according to the image
        for l in range(self.height):
            for c in range(self.width):
                x = int(c * unit_width)
                y = int(l * unit_height)

                if is_active_unit(image, x, y, int(unit_width), int(unit_height)):
                    self.units[c, l] = SOMUnit(c, l, self.input_data["vec_dim"],
                        initial_learning_rate = self.initial_learning_rate,
                        learning_rate_decay = self.learning_rate_decay,
                        initial_radius = self.initial_radius,
                        radius_decay = self.radius_decay)

        self.units = self.units.T

        # calculate the distances for each SOMUnit object to all other units
        for line in self.units:
            for unit in line:
                if not unit is None:
                    unit.calculate_unit_distances(self.units)

    # train on the generated 2D array with given input data
    def train(self, verbose=False):
        for epoch in range(1, self.epochs+1):
            if verbose: print("training epoch: ", epoch, "                          ")
            input_set = self.input_data['arr'].copy().tolist()

            while len(input_set) > 0:
                i = rd.randint(0, len(input_set)-1) # take random input
                vec = input_set.pop(i)
                best_unit = self._get_best_unit(vec) # find best unit
                for line in self.units: # adjust weights
                    for unit in line:
                        if not unit is None:
                            unit.adjust_weights(vec, self.epochs, best_unit)
                if verbose: print("vectors left: ", len(input_set), "                          ", end = "\r")
        if verbose: print("")

    def _get_best_unit(self, input_vector):
        best_unit = None
        best_value = -1

        for line in self.units:
            for unit in line:
                if not unit is None:
                    value = unit.calculate_input_distance(input_vector)
                    if best_value == -1 or value < best_value:
                        best_unit = unit
                        best_value = value

        return best_unit

    # maps the trained weights of the units after the call of the train() method to a numpy array and returns it.
    def get_weights(self):
        som_weights = []

        for line in self.units:
            for unit in line:
                if not unit is None:
                    som_weights.append(unit.weights)
                else:
                    som_weights.append(np.full(self.input_data["vec_dim"], 0))

        return np.array(som_weights)
