In [None]:
import pandas as pd
import numpy as np
import networkx as nx

In [None]:
import holoviews as hv
import panel as pn
import hvplot
import hvplot.pandas
from holoviews import opts
hv.extension('bokeh')
defaults = dict(width=400, height=400)
hv.opts.defaults(
    opts.EdgePaths(**defaults), opts.Graph(**defaults), opts.Nodes(**defaults))

In [None]:
# Set the data X
X = []
labels = []

size = 4

X.extend(np.random.normal(loc=[-size, -size], size=(25, 2)))
labels.extend([0 for _ in range(25)])

X.extend(np.random.normal(loc=[size, size], size=(25, 2)))
labels.extend([1 for _ in range(25)])

X.extend(np.random.normal(loc=[size, -size], size=(25, 2)))
labels.extend([2 for _ in range(25)])

X.extend(np.random.normal(loc=[-size, size], size=(25, 2)))
labels.extend([3 for _ in range(25)])

X = np.array(X)
X_df=pd.DataFrame(X,columns= ['X','Y'])
X_df['labels']=labels

colors=['red','blue','green','orange','gray']
colors1=['gray','red','blue','green','orange']


In [None]:
data_plot=X_df.hvplot.scatter('X', 'Y', by='labels', marker='o', size=100, color=colors  ).opts(
                                height=500, width=700,
                                tools=['hover'],  toolbar= 'left',
                                title="Пространство признаков", legend_position='right', show_grid=True,
                               
    )
data_plot

In [None]:
# Set random weights with the same dimensions as the data
s = int(np.ceil(np.sqrt(X.shape[0])))
d = 2
xx = np.linspace(-d, d, s)
yy = np.linspace(-d, d, s)
W = np.array([[x, y] for y in yy for x in xx])

In [None]:
# Use networkx to define topology and neighbourhood connections
# The i'th row of the weights maps to the i'th node in the network
# Using a lattice as an example
G = nx.grid_2d_graph(m=s, n=s)
position = nx.spring_layout(G, scale=1)

# Map coordinates to index and index to coordinates
c2i = {c: i for i, c in enumerate(G.nodes())}
i2c = {v: k for k, v in c2i.items()}
# If there are more nodes than weights, append difference to match size
diff = np.abs(len(G.nodes()) - W.shape[0])
if diff > 0:
    W = np.concatenate([W, np.random.random(size=(diff, W.shape[1]))])

In [None]:
for i, c in enumerate(G.nodes()):
    position[c]=np.array(W[i])
node_plot=hv.Graph.from_networkx(G, position)
node_plot.opts(inspection_policy='nodes', edge_color='gray', edge_hover_line_color='black', edge_alpha=0.5,
               node_color='gray', node_hover_fill_color='black', node_alpha=0.5,
               node_size=10 )
data_plot*node_plot

In [None]:
# Function for learning rate
def f1(x, k):
    return np.exp(x * k)

# Function for neighbourhood restraint
def f2(x, s):
    return np.exp(-(x / (s ** 2)))

# Compute learning rate
def learning_rate(s, k):
    return f1(x=s, k=k)

# Compute neighbourhood restraint
def restraint(G, best, n, sigma=1):
    dist = nx.shortest_path_length(G, source=best, target=n)
    return f2(dist, s=sigma)

In [None]:
import time
import progressbar

# Keep track of which unit (neuron) won what
winners = {i: [] for i in range(W.shape[0])}

max_iter = 1000 #70000

weights = []
weights.append(np.copy(W))

with progressbar.ProgressBar(max_value=max_iter, enable_colors=False) as bar:
    # Start training loop
    for s in range(max_iter):
        bar.next()
        # Pick random data point
        r_idx = np.random.randint(X.shape[0])
        x = X[r_idx, :]
    
        # Find the best matching unit (BMU) using Euclidean distance
        x_stack = np.stack([x]*W.shape[0], axis=0)
        dists = np.linalg.norm(x_stack - W, axis=1)
        best_idx = np.argmin(dists)
    
        # Set learning rate
        k = -(1/500) #/ -(1/1000)
        a = learning_rate(s, k)
        
        
        # Update weights
        W[best_idx, :] = W[best_idx, :] + a * (x - W[best_idx, :])
        
    
        # Add the index of the data point to the "won" list
        winners[best_idx].append(r_idx)
    
        
        # Update weights of neighbours
        immediate_n = list(G[i2c[best_idx]])
        for n in immediate_n:
            W[c2i[n], :] = W[c2i[n], :] + restraint(G, i2c[best_idx], n) * a * (x - W[c2i[n], :])
        
    
        # # Update all weights
        # for n in G.nodes():
        #     W[c2i[n], :] = W[c2i[n], :] + restraint(G, i2c[best_idx], n) * a * (x - W[c2i[n], :])
            
        weights.append(np.copy(W))

In [None]:
for i, c in enumerate(G.nodes()):
    position[c]=np.array(W[i])
    
node_plot=hv.Graph.from_networkx(G, position)
node_plot.opts(inspection_policy='nodes', edge_color='gray', edge_hover_line_color='black', edge_alpha=0.5,
               node_color='gray', node_hover_fill_color='black', node_alpha=0.5,
               node_size=10 )
data_plot*node_plot

In [None]:
n_iter = pn.widgets.IntSlider(name='Итерация', start=0, end=max_iter-1, value=0,  width = 250)

def get_net(n_iter):
    Net_weights=weights[n_iter]
    for i, c in enumerate(G.nodes()):
        position[c]=np.array(Net_weights[i])
    node_plot=hv.Graph.from_networkx(G, position)
    node_plot.opts(inspection_policy='nodes', edge_color='gray', edge_hover_line_color='black', edge_alpha=0.5,
               node_color='gray', node_hover_fill_color='black', node_alpha=0.5,
               node_size=10 )
    return (data_plot*node_plot)

pn.Row(
       
    pn.pane.HoloViews(
        pn.bind(get_net, n_iter)
    ).servable(),
    pn.WidgetBox(
        pn.Column(
            "Карта Кононена",
            n_iter,
            height = 400,
            ).servable(target='sidebar')
    ),
)
    
    

In [None]:
# Match unit to most "won" data point, else sign no label
def best_match(idxs):
    idx = None
    if len(idxs) > 0:
        idx = max(idxs, key=idxs.count)
    
    return idx

unit_match = {i: best_match(idxs) for i, idxs in winners.items()}

# Match unit with the label, if assigned index
unit_label = {i: labels[idx] for i, idx in unit_match.items() if idx}

# Plot the results as a scatter plot (optional, try heat map)
results = np.array([list(i2c[k]) for k in unit_label.keys()])
pred = list(unit_label.values())

unit_wins= [(i2c[k]) for k in unit_label.keys()]
unit_lose= set(G.nodes) - set(unit_wins)

node_lose= dict.fromkeys(unit_lose, -1)
node_wins = dict(zip(unit_wins, pred))

In [None]:
#nx.get_node_attributes(G,'cluster')

In [None]:
nx.set_node_attributes(G, node_lose | node_wins, 'cluster')
node_plot=hv.Graph.from_networkx(G, position)
node_plot.opts(inspection_policy='nodes', edge_color='gray', edge_hover_line_color='black', edge_alpha=0.5,
                node_hover_fill_color='black', node_alpha=0.95,
               node_size=10, node_color='cluster', cmap=colors1)
data_plot*node_plot

In [None]:
print("Missing nodes:")
print([i2c[k] for k, v in unit_match.items() if not v])