In [None]:
# pip install -U som-learn

In [None]:
# pip install sklearn-som

In [1]:
%config Completer.use_jedi = False

In [2]:
import os
import numpy as np
import cv2 as cv
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.metrics import recall_score
from sklearn.metrics import precision_score
import sys

from sklearn.utils import shuffle

In [3]:
from sklearn_som.som import SOM as SOM_sk

In [3]:
from collections import Counter
from somlearn import SOM

In [4]:
from skimage.util import random_noise
import copy

In [5]:
class Dataset():
    def read_dataset(self):
        directory = "Dataset/"
        train_data = []
        test_data = []
        train_label = [] 
        test_label = []

        for i in range(0,10):
            data_dir = directory + str(i) + '/'

            # bipolar one hot lable
            lable = -1 * np.ones((100, 19), dtype=int)     
            lable[:, i] = 1

            images = self.read_image(data_dir)

            X_train, X_test, y_train, y_test = train_test_split(images, lable, test_size=0.20, random_state=42)

            train_data.append(X_train)
            test_data.append(X_test)
            train_label.append(y_train)
            test_label.append(y_test)

        maths = ['add', 'dec', 'div', 'eq', 'mul', 'sub', 'x', 'y', 'z']
        for i, op in enumerate(maths):
            data_dir = directory + op + '/'

            # bipolar one hot lable
            lable = -1 * np.ones((100, 19), dtype=int)     
            lable[:, i+10] = 1

            images = self.read_image(data_dir)

            X_train, X_test, y_train, y_test = train_test_split(images, lable, test_size=0.20, random_state=42)

            train_data.append(X_train)
            test_data.append(X_test)
            train_label.append(y_train)
            test_label.append(y_test)

        train_data = np.concatenate(train_data, axis=0)
        test_data = np.concatenate(test_data, axis=0)
        train_label = np.concatenate(train_label, axis=0)
        test_label = np.concatenate(test_label, axis=0)

        self.train_data = train_data
        self.test_data = test_data
        self.train_label = train_label
        self.test_label = test_label

        X_train, y_train = shuffle(self.train_data, self.train_label, random_state=0)
        self.train_data_shuffle = X_train
        self.train_label_shuffle = y_train
        
        
        X_test, y_test = shuffle(self.test_data, self.test_label, random_state=0)
        self.test_data_shuffle = X_test
        self.test_label_shuffle = y_test
        
    def read_image(self, directory):
        path = os.path.join(directory)
        images = []
        img_size = 100
        for img in os.listdir(path):
            img_array = cv.imread(os.path.join(path,img), cv.IMREAD_GRAYSCALE)
            if img_array is not None:
                new_image = cv.resize(img_array, (img_size, img_size))
                images.append(new_image / 255)
        
        images = np.array(images)    
        flatten_img = images.reshape(100,(img_size*img_size))
        return flatten_img


In [56]:
class SOM_Mine():
    def __init__(self):
        self.weights = None
        self.cluster_label = None
    
    def set_cluster_label(self, y, labels):
        list_mode = []
        for inde_la in range(self.num_nerons):
            index_c0 = np.argwhere(labels == inde_la).reshape(1, -1)

            if self.single_label == False:
                true_class = [np.argmax(y[i]) for i in index_c0[0]]
            else:
                true_class = [y[i] for i in index_c0[0]]
        
            # majority vote
            list_mode.append(Counter(true_class).most_common(1)[0][0])
        
        return np.array(list_mode)
    
    
    def fit(self, X, y, num_col = 8, num_row = 5, single_label = False):
        self.num_classes = 19
        self.single_label = single_label
        self.num_nerons = num_col * num_row
        
        self.som = SOM(n_columns = num_col, n_rows = num_row, random_state = 1)
        labels = self.som.fit_predict(X, epochs = 200, scale0 = 0.01)
        
        dic_labels = sorted(Counter(labels).items())
        assert len(dic_labels) == self.num_nerons
        
        # weights
        self.weights = self.som.algorithm_.codebook.reshape(self.num_nerons, -1)
        
        # cluster labels
        self.cluster_label = self.set_cluster_label(y, labels)
        
        return self.weights, self.cluster_label
    

In [8]:
class LVQ():
    def __init__(self, initial_codebook, cluster_label, alpha = 1e-4):
        
        self.alpha = alpha
        
        self.weights = initial_codebook  # (num_nerons, 100*100)
        self.cluster_label = cluster_label  # (num_nerons,)
        
        self.num_nerons = self.weights.shape[0]
        assert self.num_nerons == len(self.cluster_label)

        
    def euclidean_distance(self, x, w):
        # x is (num_nerons, 100*100)
        # w is (num_nerons, 100*100)
        # output is (num_nerons,)
        
        distance = np.zeros(self.num_nerons)
        for i in range(len(x)):
            distance[i] = np.sum((x[i] - w[i])**2)
        return distance
    
    
    def fit(self, X, y_, max_epoch = 200, decrese_alpha = True, single_label = False):
        self.single_label = single_label
        epoch = 0
        flag = True
        while flag:
            print('epoch', epoch)
            
            # decrese alpha
            if decrese_alpha:
#                 rate = self.alpha * (1.0-(epoch/float(max_epoch)))
                if epoch % 100 == 0 and epoch != 0:
                    rate = self.alpha / 10
                else:
                    rate = self.alpha
            else:
                rate = self.alpha
               
            sames_la = 0
            not_sames = 0
            for f_img, label_ in zip(X, y_):
                image = f_img.reshape(1, -1)
                
                if self.single_label == False:
                    label = label_.reshape(19, 1)
                    label = label.astype(float)
                    t = np.argmax(label)
                else:
                    t = label_
                
                one_vector = np.ones((self.num_nerons, 1))
                eu_dis = self.euclidean_distance(one_vector @ image, self.weights)
                
                k = np.argmin(eu_dis)
                
                
                # update
                delta = rate * (image - self.weights[k])
                                                
                if t == self.cluster_label[k]:         
                    sames_la += 1
                    self.weights[k] += delta[0]
                else:
                    not_sames += 1
                    self.weights[k] -= delta[0]
                
            
            if epoch < max_epoch:
                epoch += 1
                flag = True
            else:
                flag = False
    
    
    def predict(self, x_img):
        image = x_img.reshape(1 , -1)
        
        one_vector = np.ones((self.num_nerons, 1))
        eu_dis = self.euclidean_distance(one_vector @ image, self.weights)

        k = np.argmin(eu_dis)

        return self.cluster_label[k]
    
    
    def test(self, X, y_):
        y_pred = []
        y_true = []
        for f_img, label in zip(X, y_):
            class_predict = self.predict(f_img)
            y_pred.append(class_predict)
            
            if self.single_label == False:
                class_true = np.argmax(label)
            else:
                class_true = label
                
            y_true.append(class_true)
            
        self.y_true = y_true
        self.y_pred = y_pred
        print('accuracy: ',accuracy_score(self.y_true, self.y_pred)* 100)
        print('\n')
        print('precision: ',precision_score(self.y_true, self.y_pred, average=None, zero_division = 0) * 100)
        print('***')
        print('precision: ',precision_score(self.y_true, self.y_pred, average='macro', zero_division = 0) * 100)
        print('\n')
        print('recall: ', recall_score(self.y_true, self.y_pred, average=None) * 100)
        print('***')
        print('recall: ', recall_score(self.y_true, self.y_pred, average='macro') * 100)
        
        return self.y_true, self.y_pred

In [9]:
dataset = Dataset()
dataset.read_dataset()

In [57]:
Mine_SOM = SOM_Mine()
we, clus = Mine_SOM.fit(dataset.train_data_shuffle, dataset.train_label_shuffle, num_col = 10, num_row = 5)
print('weight shape', we.shape)
print('cluser shape', clus.shape)
print('cluster', clus)


weight shape (50, 10000)
cluser shape (50,)
cluster [14  8  8 12 12 18 17  5  6 15  9  1  5 16 18  8  1 16 17  0  7 15 15 16
 16  7 11  2 17 18 11 11  2 17 13  0  5  5 13 13  0  2  0  6 10  7  2  4
 10 13]


In [59]:
# print(sorted(clus))

In [25]:
init_weight = copy.deepcopy(we)
init_clus = copy.deepcopy(clus)

In [None]:
lvq = LVQ(init_weight, init_clus, alpha = 1e-3)
lvq.fit(dataset.train_data_shuffle, dataset.train_label_shuffle, max_epoch = 1, decrese_alpha =True)


In [35]:
print('Test')
test_y_true , test_y_pred = lvq.test(dataset.test_data_shuffle, dataset.test_label_shuffle)
print(100*'/')
print('\n')
print('Train')
train_y_true , train_y_pred = lvq.test(dataset.train_data_shuffle, dataset.train_label_shuffle)

Test
accuracy:  40.526315789473685


precision:  [11.76470588 32.35294118 31.57894737  0.         40.         41.66666667
 20.         23.80952381 29.16666667 50.         62.5        53.33333333
 50.         46.15384615 56.         38.0952381  50.         45.83333333
 40.74074074]
***
precision:  38.05241806455755


recall:  [10. 55. 30.  0. 40. 50. 10. 25. 35. 35. 50. 80. 35. 60. 70. 40. 35. 55.
 55.]
***
recall:  40.526315789473685
////////////////////////////////////////////////////////////////////////////////////////////////////


Train
accuracy:  57.10526315789474


precision:  [57.53424658 47.32142857 62.85714286  0.         52.72727273 66.31578947
 64.61538462 56.4516129  66.66666667 52.30769231 54.94505495 55.39568345
 56.17977528 58.5106383  52.17391304 53.62318841 62.71186441 59.45945946
 55.55555556]
***
precision:  54.49222997610388


recall:  [52.5  66.25 55.    0.   36.25 78.75 52.5  43.75 55.   42.5  62.5  96.25
 62.5  68.75 75.   46.25 46.25 82.5  62.5 ]
***
recall:  57

## Noise

In [39]:
def noise_img(test_data, amount= 0.1):
    new_test = []
    for img in test_data:
        temp_img = copy.deepcopy(img)
        sp = random_noise(temp_img, mode = 's&p', amount = amount)
        new_test.append(sp)
        
    return np.array(new_test)

In [40]:
new_train_10_noise = noise_img(dataset.train_data_shuffle, amount= 0.1)
new_train_10_noise.shape

(1520, 10000)

In [41]:
new_train_20_noise = noise_img(dataset.train_data_shuffle, amount= 0.2)
new_train_20_noise.shape

(1520, 10000)

In [42]:
print('Train with 10% noise')
test_y_true , test_y_pred = lvq.test(new_train_10_noise, dataset.train_label_shuffle)


Train with 10% noise
accuracy:  52.69736842105262


precision:  [ 59.15492958  94.          76.78571429   0.          52.
  50.          75.55555556  68.18181818  45.28301887  52.38095238
  68.91891892 100.          51.64835165  56.98924731  63.04347826
  83.87096774  57.62711864  83.33333333  16.90544413]
***
precision:  60.8252025702531


recall:  [52.5  58.75 53.75  0.   32.5  80.   42.5  37.5  60.   41.25 63.75 57.5
 58.75 66.25 72.5  32.5  42.5  75.   73.75]
***
recall:  52.69736842105264


In [43]:
print('Train with 20% noise')
test_y_true , test_y_pred = lvq.test(new_train_20_noise, dataset.train_label_shuffle)


Train with 20% noise
accuracy:  40.19736842105263


precision:  [ 84.61538462 100.          88.63636364   0.          70.58823529
  53.06122449 100.         100.          47.82608696  75.
  82.           0.          60.81081081  76.5625      84.74576271
 100.          89.65517241  90.69767442   8.91719745]
***
precision:  69.11139014734135


recall:  [41.25 18.75 48.75  0.   30.   65.   31.25 20.   55.   37.5  51.25  0.
 56.25 61.25 62.5  16.25 32.5  48.75 87.5 ]
***
recall:  40.19736842105262
