### Importing Libraries 

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 
from sklearn.model_selection import train_test_split
import random
from statistics import mode

plt.rcParams["figure.figsize"] = (12,25)
cmap = plt.get_cmap("Set2")
Colors = np.array(cmap.colors)

### Loading Dataset & Split

In [6]:
X_train = pd.read_csv("./UCI HAR Dataset/train/X_train.txt",header=None,delim_whitespace=True).to_numpy()
X_test = pd.read_csv("./UCI HAR Dataset/test/X_test.txt",header=None,delim_whitespace=True).to_numpy()

Y_train = pd.read_csv("./UCI HAR Dataset/train/y_train.txt",header=None,delim_whitespace=True).to_numpy()
Y_test = pd.read_csv("./UCI HAR Dataset/test/y_test.txt",header=None,delim_whitespace=True).to_numpy()

X_train, X_valid, Y_train, Y_valid = train_test_split(X_train, Y_train, test_size=0.15,stratify=Y_train)


Total_X = np.concatenate((X_valid, X_train,X_test), axis=0)
Total_Y = np.concatenate((Y_valid, Y_train,Y_test), axis=0)



### Custom SOM Implementation

In [13]:
class CustomSOM : 
    """Storing size of SOM grid with feature vector len and Learning rate, radius"""
    def __init__(self,grid_size=(10,10),features_count=561,lr=0.01,radius=None):
        self.grid_size = grid_size 
        self.lr_0 = lr
        self.features_count = features_count 
        if radius is None : 
            radius = min(grid_size)
        self.radius_0 = radius
        self.reset()
        
    """Resting network training logs"""
    def reset(self):
        self.lr = self.lr_0
        self.radius = self.radius_0
        self.w_map_0 = np.random.RandomState(0).random((self.grid_size[0],self.grid_size[1],self.features_count)).astype(float)
        self.w_map = self.w_map_0.copy()
        self.dead_neurons = []
        self.distance_mean = []
        self.loss = []
        
    """Train SOM network"""
    def train(self,X,Ite,lr_decay=1,ErrorThresh=10**-20):
        self.reset()
        self.Ite = Ite
        non_deads =  np.zeros(self.grid_size)
        for ite in range(Ite):
            pre_w_map = self.w_map.copy()
            shuffled_index =  random.sample(list(range(X.shape[0])), X.shape[0])
            distance_mean = np.zeros_like(X[0])
            for index in range(X.shape[0]):
                x_sample = X[shuffled_index[index],:]
                winner = self.get_winner(x_sample)
                non_deads[winner[0],winner[1]] += 1 
                neighbour_mask = self.get_neighbour_mask(winner)
                self.w_map_update(x_sample,neighbour_mask,X.shape[0])

                winner = self.w_map[winner[0],winner[1]].copy()
                distance_mean += np.abs(x_sample-winner)
            

            deads = self.grid_size[0]*self.grid_size[1] - np.count_nonzero(non_deads) 
            self.dead_neurons.append(deads)
                        
            distance_mean /= X.shape[0]    
            self.distance_mean.append(distance_mean.mean())    

            if (ite/(Ite))*lr_decay > 0 :
                self.lr = self.lr_0 * (1-(ite/(Ite))*lr_decay)
                self.radius = self.radius_0 * (1-(ite/(Ite)))
            
            self.loss.append(np.linalg.norm(pre_w_map - self.w_map))
            

            if ite%5 == 0 or ite == Ite-1:
                print(f"Ite: {ite}, Loss: {self.loss[-1]:.4f}, lr: {self.lr:.4f},R: {self.radius}")
                     
            if self.loss[-1] <= ErrorThresh :
                print("Converged early in iteration #{0}".format(ite))
                return 
     
    """Visualize Clusters on grid of network"""       
    def visualize(self, X, Y):    
        predict = {}    
        for index in range(len(X)):
            x_sample = X[index, :]
            winner = self.get_winner(x_sample)
            if winner in predict :
                predict[winner].append(Y[index][0])
            else:
                predict[winner] = [Y[index][0]]
        img = np.zeros((self.grid_size[0],self.grid_size[1],3))
        img.fill(200)
        global Colors
        for i in range(self.grid_size[0]):
            for j in range(self.grid_size[1]):
                if tuple([i,j]) in predict:
                    color_index =  mode(predict[tuple([i,j])]) 
                    img[i,j] = Colors[color_index]    
                
        
        plt.imshow(img)
        plt.title("Cluster Vizualization | MIte:{0}/LR:{1} | Size:({2},{3}) | R:{4}".format(self.Ite,self.lr_0,self.grid_size[0],self.grid_size[1],self.radius_0))
        plt.axis("off")

    """Calculate U-Matrix, visualize and return it"""
    def get_u_matrix(self,show=False):
        u_matrix = np.zeros(self.grid_size)
        for i in range(self.grid_size[0]):
            for j in range(self.grid_size[1]):
                n = 0
                for ri in range(-1, 2):
                    for rj in range(-1, 2):
                        if 0 <= i + ri < self.grid_size[0] and 0 <= j + rj < self.grid_size[1]:
                            if np.sqrt(ri**2 + rj**2) <= 1 :
                                u_matrix[i,j] += (np.linalg.norm(self.w_map[i,j] - self.w_map[ri,rj]))
                                n +=1
                            
                u_matrix[i,j] /= n
        if show :
            sns.heatmap(u_matrix,annot=True,fmt=".3f",linewidths=2, linecolor='white',cmap="viridis") 
            plt.axis("off")
            plt.title("U-Matrix of SOM - MIteration:{0}/LearningRate:{1}".format(self.Ite,self.lr_0))
            
        return u_matrix
            
    """Transform a entire dataset to new feature space"""
    def dataset_transform(self,X):
        new_x = np.zeros((X.shape[0],self.grid_size[0],self.grid_size[1]))
        for index in range(X.shape[0]):
            new_x[index]= self.feature_transform(X[index])
        return new_x             
      
    """Transform single sample to new space"""      
    def feature_transform(self, x,show=False):
        rep_x = np.tile(x, (self.w_map.shape[0], self.w_map.shape[1], 1))
        dists = np.sum((self.w_map - rep_x) ** 2, axis=2)
        if show :
            plt.imshow(1/(1+dists),cmap="cool")
            plt.title("Feature Extracted map - MIteration:{0}/LearningRate:{1}".format(self.Ite,self.lr_0))
        return 1/(1+dists)
    
    """Calculate winner neuron entire grid"""        
    def get_winner(self,x_sample):   
        rep_x = np.tile(x_sample, (self.w_map.shape[0], self.w_map.shape[1], 1))
        dists = np.sum((self.w_map - rep_x) ** 2, axis=2)
        winner = np.unravel_index(np.argmin(dists, axis=None), shape=dists.shape)
        return winner
    
    """Get a mask that neighbours of winner included with their coef"""
    def get_neighbour_mask(self,winner_index):
        mask = np.zeros(self.grid_size)
        mask[winner_index[0],winner_index[1]] = 1
        Radius = int(self.radius)
        for ri in range(-Radius, Radius+1):
            for rj in range(-Radius, Radius+1):
                if 0 <= winner_index[0] + ri < self.w_map.shape[0] and 0 <= winner_index[1] + rj < self.w_map.shape[1]:
                    if np.sqrt(ri**2 + rj**2) > Radius :
                        mask[winner_index[0] + ri, winner_index[1] + rj] = 0                     
                    else:
                        mask[winner_index[0] + ri, winner_index[1] + rj] = 1/(1+np.sqrt(ri**2 + rj**2))           
        return mask 
    
    """Update weights of map during train based on neighbours mask of winner neuron"""
    def w_map_update(self,x_sample,neighbour_mask,X_len):
        neighbour_mask = np.repeat(neighbour_mask[:,:,np.newaxis],self.w_map.shape[2],axis=2)
        x_repeat = np.tile(x_sample, (self.w_map.shape[0], self.w_map.shape[1], 1))
        Delta = x_repeat - self.w_map
        self.w_map = self.w_map + (self.lr/X_len) * np.multiply(neighbour_mask, Delta)
                
            
        
        

### Main: Training SOM

In [14]:
som = CustomSOM((9,9),lr=0.8,radius=5,features_count=561)
som.train(X_valid,4000,lr_decay=1.1)

plt.subplot(5,1,1)
som.visualize(X_valid,Y_valid)
plt.subplot(5,1,2)
som.get_u_matrix(True)
plt.subplot(5,1,3)
data = {"Iteration":list(range(len(som.loss))),"Loss":som.loss}
sns.lineplot(data=data,x="Iteration",y="Loss")
plt.title("Loss of Network")
plt.subplot(5,1,4)
data = {"Iteration":list(range(len(som.dead_neurons))),"Dead Neuron":som.dead_neurons}
sns.lineplot(data=data,x="Iteration",y="Dead Neuron")
plt.title("Dead Neurons of Network")

plt.subplot(5,1,5)
data = {"Iteration":list(range(len(som.distance_mean))),"Mean Distance":som.distance_mean}
sns.lineplot(data=data,x="Iteration",y="Mean Distance")
plt.title("Mean Distance to Winners")
plt.savefig("decay {0} - radius {1} - grid size ({2}-{2})-3.png".format(1,5,9))
plt.clf()



Ite: 0, Loss: 39.9921, lr: 0.8000,R: 5
Ite: 5, Loss: 9.3588, lr: 0.7989,R: 4.99375
Ite: 10, Loss: 6.5624, lr: 0.7978,R: 4.987500000000001
Ite: 15, Loss: 4.6875, lr: 0.7967,R: 4.98125
Ite: 20, Loss: 2.6376, lr: 0.7956,R: 4.975
Ite: 25, Loss: 2.1322, lr: 0.7945,R: 4.96875
Ite: 30, Loss: 2.5664, lr: 0.7934,R: 4.9625
Ite: 35, Loss: 2.7047, lr: 0.7923,R: 4.95625
Ite: 40, Loss: 2.8242, lr: 0.7912,R: 4.95
Ite: 45, Loss: 2.9552, lr: 0.7901,R: 4.94375
Ite: 50, Loss: 2.5284, lr: 0.7890,R: 4.9375
Ite: 55, Loss: 2.2664, lr: 0.7879,R: 4.9312499999999995
Ite: 60, Loss: 2.1153, lr: 0.7868,R: 4.925
Ite: 65, Loss: 1.9591, lr: 0.7857,R: 4.91875
Ite: 70, Loss: 1.7518, lr: 0.7846,R: 4.9125000000000005
Ite: 75, Loss: 1.6365, lr: 0.7835,R: 4.90625
Ite: 80, Loss: 1.4330, lr: 0.7824,R: 4.9
Ite: 85, Loss: 1.2817, lr: 0.7813,R: 4.89375
Ite: 90, Loss: 1.1650, lr: 0.7802,R: 4.8875
Ite: 95, Loss: 1.1045, lr: 0.7791,R: 4.88125
Ite: 100, Loss: 1.0073, lr: 0.7780,R: 4.875
Ite: 105, Loss: 0.8972, lr: 0.7769,R: 4.86875

Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).


<Figure size 864x1800 with 0 Axes>

### Transform Dataset

In [19]:
new_valid = som.dataset_transform(X_valid)
new_train = som.dataset_transform(X_train)
new_test = som.dataset_transform(X_test)

### Save transformed Dataset

In [20]:
with open('X_test.npy', 'wb') as f:
    np.save(f, new_test)
    
with open('X_valid.npy', 'wb') as f:
    np.save(f, new_valid)

with open('X_train.npy', 'wb') as f:
    np.save(f, new_train)
    
with open('Y_test.npy', 'wb') as f:
    np.save(f, Y_test)
    
with open('Y_valid.npy', 'wb') as f:
    np.save(f, Y_valid)

with open('Y_train.npy', 'wb') as f:
    np.save(f, Y_train)
