In [None]:
import numpy as np
import pandas as pd
import math

def get_data(filepath):
    x_raw, y_raw = [], []
    with open(filepath, 'r', encoding='utf-8') as file:
        for line in file:
            line = [float(x) for x in line.strip().split(' ')]
            x_raw += [line[:-1]]
            y_raw += [line[-1]]
        
        return np.array(x_raw), np.array(y_raw)
    
def normalize(x):
    return (x_raw - x_raw.min(axis=0)) / (x_raw.max(axis=0) - x_raw.min(axis=0))
    
def shuffle(x, y):
    
    indice = np.arange(len(x))
    np.random.shuffle(indice)
    return np.array([x[i] for i in indice]),  np.array([y[i] for i in indice])

def split(x, y, split_ratio=2/3):
    
    train_len = math.ceil(len(x) * split_ratio)
    return x[:train_len], y[:train_len], x[train_len:], y[train_len:]

def plot_dataset(x, y):
    
    colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', 'w']
    x = pd.DataFrame(x)
    classes = np.unique(y)
    
    fig, ax = plt.subplots()
    for i, c in enumerate(classes):
        plt.scatter(x[y == c][0], x[y == c][1], label='Class {}'.format(c), c=colors[i])

    plt.show()

# distance function
def euclidean_distance(x1, x2):
    return np.linalg.norm(x1 - x2, ord=2)
    
def time_constant(iteration, constant):
    # iteration: 迭代總次數
    # constant: 常數。這裡可以是有效寬度(sigma)初始值、學習率(leraning_rate)初始值
    return iteration / np.log(constant)

# 計算得勝類神經元 j* 的鄰近區域函數的強度
# 高斯型式之鄰近區域函數
def gaussian(d, sigma):
    # d: 得勝神經元 j* 與 類神經元 j 的側向連結距離
    # sigma: 有效寬度
    return np.exp(- (d**2) / (2 * sigma**2))
    
# 指數衰減函數
def exponential_decay(init, n, tau):
    # init: 欲衰減參數的初始值。ex: 有效寬度(sigma)初始值、學習率(leraning_rate)初始值
    # n: 第 n 次迭代數
    # tau: 時間常數

    return init * np.exp(-1 * n / tau)

def learning_rate_decay(n, iteration):
    return 0.9 * (1 - n/iteration)

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

class SOM:
    def __init__(self, row, col, input_dim, distance_func, kernel_func, sigma_decay_func, lr_decay_func):
        
        self.row = row
        self.col = col
        self.num = row * col
        self.input_dim = input_dim
        self.neuron_map = []
        
        self.distance_function = distance_func
        self.kernel_function = kernel_func
        self.sigma_decay_function = sigma_decay_func
        self.lr_decay_function = lr_decay_func
        
        self.init_neuron()
        self.plot_neuron_map()
        
    
    def init_neuron(self):
        for i in range(self.row):
            self.neuron_map.append([Node(self.input_dim) for j in range(self.col)])
        self.neuron_map = np.array(self.neuron_map)
        
        
    def train(self, x_train, y_train, epoch_size, sigma0):
        def get_winner(x):
            # 回傳得勝類神經元的索引值(tuple), ex: (0, 1)
            min_distance = self.distance_function(x, self.neuron_map[0][0].weight)
            winner = (0, 0)
            for r in range(self.row):
                for c in range(self.col):
                    neuron = self.neuron_map[r][c]
                    distance = self.distance_function(x, neuron.weight) - neuron.get_competition_bias(self.num)
                    winner = (r, c) if distance < min_distance else winner
                    min_distance = min(distance, min_distance)
            
            return winner 
        
        def update_consciences(winner):
            for r in range(self.row):
                for c in range(self.col):
                    y = 0 if (r, c) != winner else 1
                    self.neuron_map[r][c].update_conscience(y)
                    
        def get_neighbors(winner, sigma):
            # 回傳得勝神經元的鄰居(list), ex: [(0, 0), (0, 2)]
            sigma = round(sigma)
            row_lowerbound = np.clip(winner[0] - sigma, 0, self.row).astype(int)
            row_upperbound = np.clip(winner[0] + sigma + 1, 0, self.row).astype(int)
            col_lowerbound = np.clip(winner[1] - sigma, 0, self.col).astype(int)
            col_upperbound = np.clip(winner[1] + sigma + 1, 0, self.col).astype(int)
            return [(r, c) for c in range(col_lowerbound, col_upperbound) for r in range(row_lowerbound, row_upperbound) if (r, c) != winner]           
            
        def update_weights(x, winner, neighbors, sigma, learning_rate):
            # 更新類神經元的權重
            neighbors_nodes = [self.neuron_map[index] for index in neighbors]
            
            self.neuron_map[winner].update_weight(x, learning_rate, 1)
            for neighbor in neighbors:
                distance = self.distance_function(np.array(winner), np.array(neighbor))
                theta = self.kernel_function(distance, sigma)
                self.neuron_map[neighbor].update_weight(x, learning_rate, theta)
              
        
        sigma_tau = time_constant(epoch_size, sigma0)
        for n in range(1, epoch_size+1):
            sigma = self.sigma_decay_function(sigma0, n, sigma_tau)
            learning_rate = self.lr_decay_function(n, epoch_size)
            x_train, y_train = shuffle(x_train, y_train)
            for x, y in zip(x_train, y_train):
                winner = get_winner(x)
                neighbors = get_neighbors(winner, sigma)
                update_weights(x, winner, neighbors, sigma, learning_rate)

            if n % 1000 == 0:
                print('Epoch {}, Learning_rate: {} Sigma: {}\n'.format(n, learning_rate, sigma))
                self.plot_neuron_map()
            
        print('Final Result, Learning_rate: {} Sigma: {}\n'.format(learning_rate, sigma))
        self.plot_neuron_map()
        
    def clustering(self, X):
        
        y_hat = np.zeros((x.shape[0]))
        for i, x in enumerate(X):
            min_distance = self.distance_function(self.neuron_map[r, c].weight, x)
            index = (0, 0)
            for r in range(self.row):
                for c in range(self.col):
                    distance = self.distance_function(self.neuron_map[r, c].weight, x)
                    index = (r, c) if distance < min_distance else index
                    min_distance = min(distance, min_distance)
            
            y_hat[i] = index
        
        return y_hat
            
                    
            
    def plot_neuron_map(self):
        
        colors = ['b', 'g', 'r', 'c', 'm', 'y', 'k', 'w']
        neurons_weights = [neuron.weight for neuron in np.array(self.neuron_map).flatten()]
        

        for weight in neurons_weights:
            plt.scatter(weight[0], weight[1], c='b')

        plt.show()
            
    
        
        
        

In [None]:
class Node:
    def __init__(self, input_dim):
        self.weight = np.random.uniform(0, 1, input_dim)
        self.conscience = 0
    
    def update_weight(self, input_vector, learning_rate, theta):
        self.weight += learning_rate * theta * (input_vector - self.weight)
    
    def update_conscience(self, y, B=0.001):
        self.conscience += B * (y - self.conscience)
        
    def get_competition_bias(self, N, C=10):
        # N: 類神經元總數
        # C: 偏移因子
        return C * (1/N - self.conscience)
        


In [None]:
x_raw, y_raw = get_data('dataset/2cring.txt')
# x_train, y_train = np.random.rand(1000, 2), np.random.rand(1000, 1)
print('size: {}'.format(len(x_raw)))

In [None]:
x_raw = normalize(x_raw)

In [None]:
plot_dataset(x_raw, y_raw)

In [None]:
x_raw, y_raw = shuffle(x_raw, y_raw)

In [None]:
x_train, y_train, x_test, y_test = split(x_raw, y_raw)

In [None]:
row = 1
col = 2
input_dim = len(x_train[0])
epoch_size = row * col * 1000
# learning_rate0 = 0.1
sigma0 = max(row, col) / 2
distance_func = euclidean_distance
kernel_func = gaussian
sigma_decay_func = exponential_decay
lr_decay_func = learning_rate_decay

In [None]:
model = SOM(row, col, input_dim, distance_func, kernel_func, sigma_decay_func, lr_decay_func)

In [None]:
model.train(x_train, y_train, epoch_size, sigma0)

In [None]:
def clustering(model, ):

In [None]:
def get_intensity(row, col, node_r, node_c, neuron_map):
    row_lowerbound = np.clip(node_r - 1, 0, row).astype(int)
    row_upperbound = np.clip(node_r + 1 + 1, 0, row).astype(int)
    col_lowerbound = np.clip(node_c - 1, 0, col).astype(int)
    col_upperbound = np.clip(node_c + 1 + 1, 0, col).astype(int)
    
    max_intensity = -1
    for r in range(row_lowerbound, row_upperbound):
        for c in range(col_lowerbound, col_upperbound):
            max_intensity = max(euclidean_distance(neuron_map[r, c].weight, neuron_map[node_r, node_c].weight), max_intensity)
    return max_intensity
    

In [None]:
weights = np.zeros((model.row * model.col, 2))
intensities = np.zeros((max(model.row, model.col), max(model.row, model.col)))

for r in range(model.row):
    for c in range(model.col):
        weights[r*model.row + c] = model.neuron_map[r, c].weight
        intensities[r, c] = get_intensity(model.row, model.col, r, c, model.neuron_map)
        

In [None]:
intensities

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.image import NonUniformImage
from matplotlib import cm

interp = 'nearest'

x = np.linspace(0, 1, max(model.row, model.col))
y = np.linspace(0, 1, max(model.row, model.col))
z = intensities

print(x.shape)
print(y.shape)
print(z.shape)

fig, axs = plt.subplots(nrows=1, ncols=1, constrained_layout=True)
fig.suptitle('NonUniformImage class', fontsize='large')
ax = axs
im = NonUniformImage(ax, interpolation=interp, extent=(0, 1, 0, 1),
                     cmap=cm.Purples)
im.set_data(x, y, z)
ax.images.append(im)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_title(interp)

plt.show()