In [1]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import data_tools as dt

In [2]:
from sklearn.datasets import make_blobs
X,y=make_blobs(n_samples=50,n_features=3,random_state=69,centers=3)

In [3]:
X=torch.from_numpy(X).type(torch.float16)
y=torch.from_numpy(y).type(torch.float16)

INITIALIZE GRID_SIZE OF KOHONEN LAYER,INPUT DIMENSION AND LEARNING RATE

In [4]:
grid=(20,20)
input_dim=3
lr=1e-2

CREATE THE KOHONEN LAYER

In [5]:
som_grid=nn.Parameter(torch.randn(grid[0],grid[1],input_dim,dtype=torch.float16)).detach()

In [6]:
def nbrs_func(win,pos,sig):
    return torch.exp(-torch.linalg.vector_norm(win-pos)/(2*sig**2))

def mex_hat(distance, sigma):
    return (1 - (distance**2 / (2 * sigma**2))) * np.exp(-distance**2 / (2 * sigma**2))

In [7]:
%%time
win=[]
dist=[]
inf=[]
dist2=[]
epochs=1
for epoch in range(epochs):
    for sample in X:

        #find winner of grid
        distances=torch.linalg.vector_norm(som_grid-sample,dim=2)
        dist.append(distances)
        winner=torch.argmin(distances)
        win.append(winner)
        #decay learning rate
        lr*=0.99

        #iterate over each element of Kohonen Layer
        for i in range(grid[0]):
            for j in range(grid[1]):
                distance=torch.linalg.vector_norm(som_grid[i,j]-winner,2)
                dist.append(distance)
                #calculate influence
                influence=mex_hat(distance,sigma=2)
                influence=influence.detach()
                inf.append(influence)
                #update/increment perceptron weights
                som_grid+=lr*influence*(sample-som_grid[i,j])



CPU times: user 5.7 s, sys: 27.3 ms, total: 5.73 s
Wall time: 5.79 s


In [8]:
X.shape

torch.Size([50, 3])

In [9]:
def main(epoch):
    for sample in X:

        #find winner of grid
        winner=torch.argmin(torch.linalg.vector_norm(sample-copy,dim=2))

        #initialize the sigma
        sig=max(grid)/2

        #decay learning rate
        lr*=(1-1e-5)

        #iterate over each element of Kohonen Layer
        for i in range(grid[0]):
            for j in range(grid[1]):
                #calculate influence
                influence=nbrs_func(torch.Tensor([i,j]),winner,sig)
                influence=influence.detach()
                #update/increment perceptron weights
                copy+=lr*influence*(sample-copy[i,j])

    som_grid.data=copy.data.clone()

In [19]:
class NeuralGas(nn.Module):
    def __init__(self,input_dim,map_size,lr=1e-2,decay=0.9999):
        super().__init__()
        
        self.learning_rate=lr
        self.decay_rate=decay
        self.map_size=map_size
        
        self.neurons=nn.Parameter(torch.rand(map_size,input_dim))
        
    def forward(self,x):
        
        dist=torch.linalg.vector_norm(x.unsqueeze(1)-self.neurons,dim=2)
        
        winner=torch.argmin(dist,dim=1)
        
        return winner
        
    def update(self,x,epoch):
        lr=self.learning_rate * (self.decay_rate ** epoch)
        winner=self.forward(x)
        
        for i in range(self.map_size):
            mask=(winner == i).unsqueeze(1)
            self.neurons.data[i]+=self.learning_rate*(x[mask.expand(50,3)].reshape(-1,3).mean(dim=0)\
                                                       -self.neurons.data[i])
    

In [40]:
winner=model.forward(X)
mask=(winner == i).unsqueeze(1)
model.neurons.data[i]+=model.learning_rate*(X[mask.expand(50,3)].reshape(-1,3).mean(dim=0)\
                                                           -model.neurons.data[i])

In [29]:
winner=model.forward(X)
for i in range(model.map_size):
    mask=(winner == i).unsqueeze(1)
    model.neurons.data[i]+=model.learning_rate*(X[mask.expand(50,3)].reshape(-1,3).mean(dim=0)\
                                                           -model.neurons.data[i])

In [39]:
model.neurons.data

tensor([[0.5049, 0.5148, 0.4249],
        [0.9875, 0.1051, 0.5308],
        [0.5686, 0.4481, 0.6141],
        ...,
        [0.4956, 0.3393, 0.0039],
        [0.9859, 0.8203, 0.1110],
        [   nan,    nan,    nan]])

In [18]:
(1-1e-4)**400

0.9607875174472561

In [None]:
X[mask.expand(50,3)].reshape(-1,3)

In [None]:
selected_values

In [33]:
model=NeuralGas(input_dim=3,map_size=400,lr=24)
epochs=100

In [None]:
z=X[mask.expand(50,3)].reshape(-1,3)

In [13]:
model.neurons.shape

torch.Size([400, 3])

In [None]:
z.reshape(1,-1)

In [None]:
model.neurons.data[2].shape

In [21]:
for epoch in range(2):
    model.update(X,epoch)
    

In [23]:
model.neurons.data

tensor([[nan, nan, nan],
        [nan, nan, nan],
        [nan, nan, nan],
        ...,
        [nan, nan, nan],
        [nan, nan, nan],
        [nan, nan, nan]])

In [None]:
import concurrent.futures
import multiprocessing
epochs=2000
num_cores = multiprocessing.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers=num_cores) as executor:
    # Submit the process_epoch function for each epoch in parallel
    futures = [executor.submit(main, epoch) for epoch in range(epochs)]

    # Wait for all tasks to complete
    concurrent.futures.wait(futures)

In [None]:
image=torch.sigmoid(som_grid).type(torch.float)

In [None]:
plt.matshow(image[:,:,1].detach().numpy())
plt.colorbar()

In [None]:
som_grid

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Assuming you have your SOM grid (som_grid) and data for calculation
# Define a function to calculate Euclidean distance
def euclidean_distance(a, b):
    return np.linalg.norm(a - b)

# Initialize the U-Matrix
u_matrix = np.zeros(som_grid.shape[:2])

# Iterate through each neuron in the SOM grid
for i in range(som_grid.shape[0]):
    for j in range(som_grid.shape[1]):
        # Calculate the average distance to neighboring neurons
        total_distance = 0
        num_neighbors = 0
        for x in range(i-1, i+2):
            for y in range(j-1, j+2):
                if 0 <= x < som_grid.shape[0] and 0 <= y < som_grid.shape[1]:
                    total_distance += euclidean_distance(som_grid[i, j], som_grid[x, y])
                    num_neighbors += 1
        u_matrix[i, j] = total_distance / num_neighbors

# Visualize the U-Matrix as a heatmap
plt.imshow(u_matrix, cmap='viridis')  # You can choose a colormap of your preference
plt.title('U-Matrix')
plt.colorbar()
plt.show()


In [None]:
som_grid.shape

In [None]:
x=0.2*0.99**50
x

In [None]:
help(torch.argmin())