In [1]:
import numpy as np
from help_functions import *

In [46]:
class SOM:
    def __init__(self, map_width, map_height, input_dim, neighbourhood_function,
                 distance_k, learning_rate=0.5, sigma=None, decay_type='exponential', beta=0.999):
        self.map_width = map_width
        self.map_height = map_height
        self.input_dim = input_dim
        self.learning_rate = learning_rate
        self.distance_k = distance_k
        self.beta = beta
        self.time = 1
        self.label_map_db = {}
        self.tmp = {}
        
        if sigma is None:
            self.sigma = max(map_width, map_height) / 2.0
        else:
            self.sigma = sigma
            
        if self.distance_k == np.inf:
            self.calculate_distance_func = chebyshev_distance
        elif self.distance_k == 1:
            self.calculate_distance_func = manhattan_distance
        elif self.distance_k == 2:
            self.calculate_distance_func = euclidean_distance
        elif self.distance_k < 1:
            raise ValueError('Distance must have positive non-zero k value')
        else:
            self.calculate_distance_func = lambda a, b, axis: generic_distance(a, b, axis, self.distance_k)
            
        # TODO add other weight initialization options
        self.weights = np.random.rand(self.map_width, self.map_height, self.input_dim)
        
        if neighbourhood_function == 'gaussian':
            self.neighbourhood_func = gaussian_neighbourhood
        elif neighbourhood_function == 'rectangular':
            self.neighbourhood_func = rectangular_neighbourhood
        elif neighbourhood_function == 'triangular':
            self.neighbourhood_func = triangular_neighbourhood
        elif neighbourhood_function == 'cosine':
            self.neighbourhood_func = cosine_down_to_zero_neighbourhood
        else:
            raise ValueError(f'Unknown neighbourhood function {neighbourhood_function}')
        
        
        if decay_type == 'exponential' and 0 < self.beta < 1:
            self.calculate_decay = decay_exponential
        elif decay_type == 'power' and self.beta < 0:
            self.calculate_decay = decay_power
        else:
            raise ValueError(f'Unknown decay type or invalid beta')
            
    def get_weights(self):
        return self.weights
    
    def get_weight_of_node(self, node_idx):
        return self.weights[node_idx[0]][node_idx[1]]
    
    def update_time(self):
        self.time += 1
        
    def find_BMU(self, input_vector):
        dists = self.calculate_distance_func(self.weights, input_vector, 2)

        min_index = np.argmin(dists)
        bmu_idx = np.unravel_index(min_index, dists.shape)
        return bmu_idx
    
    def calculate_grid_distances(self, bmu_idx):
        x_coords, y_coords = np.meshgrid(np.arange(self.map_width), 
                                         np.arange(self.map_height), indexing='ij')
        dist_sq = (x_coords - bmu_idx[0])**2 + (y_coords - bmu_idx[1])**2
        return np.sqrt(dist_sq)
            
    def calculate_neighbourhood_influence(self, bmu_idx, sigma_t):
        grid_dists = self.calculate_grid_distances(bmu_idx)
        return self.neighbourhood_func(grid_dists, sigma_t)
    
    def update_weights(self, input_vector, bmu_idx):
        eta_t = self.calculate_decay(self.learning_rate, self.beta, self.time)
        sigma_t = self.calculate_decay(self.sigma, self.beta, self.time)
        
        # shape (map_width, map_height)
        influence = self.calculate_neighbourhood_influence(bmu_idx, sigma_t)
        
        # shape (map_width, map_height, input_dim)
        diff = input_vector - self.weights
        
        # reshaping to (map_width, map_height, 1) to broadcast over diff
        influence_new = influence[:, :, np.newaxis]
        
        self.weights += eta_t * influence_new * diff
        
    def calculate_QE(self, data):
        diff_total = 0
        for sample in data:
            bmu_idx = self.find_BMU(sample)      
            weight = self.get_weight_of_node(bmu_idx)
            
            diff_total += self.calculate_distance_func(weight, sample, 0)
        
        return diff_total / data.shape[0]
    
    def get_label_map(self, data, label, epoch):
    # https://medium.com/data-science/understanding-self-organising-map-neural-network-with-python-code-7a77f501e985
        map = np.empty(shape=(self.map_width, self.map_height), dtype=object)

        for row in range(self.map_width):
          for col in range(self.map_height):
            map[row][col] = []
    
        for i, sample in enumerate(data):
            bmu_idx = self.find_BMU(sample)
            map[bmu_idx[0]][bmu_idx[1]].append(label[i])
        
        self.tmp[epoch] = np.copy(map)
        
        for row in range(self.map_width):
            for col in range(self.map_height):
                label_list = map[row][col]
                if len(label_list)==0:
                  label = None
                else:
                  label = max(label_list, key=label_list.count)
                map[row][col] = label
                
        return map
    
    def train_online(self, data, label, num_epochs):
        num_samples = data.shape[0]
        
        for epoch in range(num_epochs):
            for sample in range(num_samples):
                input_vector = data[sample]
                
                bmu_idx = self.find_BMU(input_vector)
                
                self.update_weights(input_vector, bmu_idx)
                
            self.update_time()
            
            if epoch % 10 == 0:
                print(f"Epoch {epoch+1}/{num_epochs} complete. Current sigma: {self.calculate_decay(self.sigma, self.beta, self.time):.4f}, learning rate: {self.calculate_decay(self.learning_rate, self.beta, self.time):.4f}, QE: {self.calculate_QE(data):.4f}")
                
            if epoch % 100 == 0:    
                self.label_map_db[epoch] = self.get_label_map(data, label, epoch)
    
    

In [47]:
som = SOM(3,3,3,"gaussian",2)
som.get_weights()

array([[[0.44135638, 0.01785546, 0.55622023],
        [0.81823452, 0.04625477, 0.29157827],
        [0.95821023, 0.1434274 , 0.86052974]],

       [[0.65646869, 0.98911965, 0.59175068],
        [0.02391191, 0.4927756 , 0.26858481],
        [0.88144023, 0.50996657, 0.32268189]],

       [[0.19394748, 0.86635415, 0.96254648],
        [0.33791081, 0.07737867, 0.2179625 ],
        [0.06632761, 0.47243489, 0.76249639]]])

In [48]:
data = np.array([[1, 1,1], [3, 4,3], [1, 8,4], [7, 8,7]])
label = np.array([1, 3, 3, 2])
som.train_online(data, label, 1000)

Epoch 1/1000 complete. Current sigma: 1.4970, learning rate: 0.4990, QE: 2.6252
Epoch 11/1000 complete. Current sigma: 1.4821, learning rate: 0.4940, QE: 2.5512
Epoch 21/1000 complete. Current sigma: 1.4673, learning rate: 0.4891, QE: 2.5533
Epoch 31/1000 complete. Current sigma: 1.4527, learning rate: 0.4842, QE: 2.5557
Epoch 41/1000 complete. Current sigma: 1.4383, learning rate: 0.4794, QE: 2.5583
Epoch 51/1000 complete. Current sigma: 1.4240, learning rate: 0.4747, QE: 2.5434
Epoch 61/1000 complete. Current sigma: 1.4098, learning rate: 0.4699, QE: 2.5195
Epoch 71/1000 complete. Current sigma: 1.3957, learning rate: 0.4652, QE: 2.4941
Epoch 81/1000 complete. Current sigma: 1.3819, learning rate: 0.4606, QE: 2.4689
Epoch 91/1000 complete. Current sigma: 1.3681, learning rate: 0.4560, QE: 2.4442
Epoch 101/1000 complete. Current sigma: 1.3545, learning rate: 0.4515, QE: 2.4201
Epoch 111/1000 complete. Current sigma: 1.3410, learning rate: 0.4470, QE: 2.3966
Epoch 121/1000 complete. Cu

In [54]:
som.tmp[100]

array([[list([np.int64(2)]), list([]), list([])],
       [list([]), list([np.int64(3)]), list([np.int64(3)])],
       [list([]), list([]), list([np.int64(1)])]], dtype=object)

In [50]:
som.label_map_db[0]

array([[None, None, None],
       [np.int64(2), None, np.int64(3)],
       [np.int64(3), None, np.int64(1)]], dtype=object)

In [45]:
som.label_map_db[990]

array([[np.int64(3), None, np.int64(2)],
       [None, None, None],
       [np.int64(1), None, np.int64(3)]], dtype=object)