In [61]:
from modules.som import SOM
from modules.model_picker import model_picker
import pandas as pd
import numpy as np
import time
from modules.utils import euc_distance
import math

In [62]:
df = pd.read_csv("data.csv", header=None)
X = df.iloc[:, :-1].values
y = df.iloc[:,-1].values
X.shape, y.shape

((150, 4), (150,))

In [63]:
# Function to normalize a column of data
def normalize_column(data, column_index):
    column = data[:, column_index]
    min_val = np.min(column)
    max_val = np.max(column)
    normalized_column = (column - min_val) / (max_val - min_val)
    return normalized_column


class som_classification:
    def __init__(self, m:int, n:int, X:np.array, y: np.array,) -> None:
        # preprocess X
        # Normalize each column
        for col in range(X.shape[1]):
            X[:, col] = normalize_column(X, col)
        y = np.reshape(y, -1)
        self.total_neurons_representation = m*n
        self.neuron_shape = (m,n)
        self.classes = np.unique(y)
        self.dataset = []
        self.all_dataset = X
        self.true = y
        self.true_encoded = one_hot_encode(y)
        self.best_models = None
        self.trained = False
        for c in self.classes:
            X_filtered = X[(y == c)]
            self.dataset.append((X_filtered, c))

    
    def predict(self, X: np.array):
        predictions = []

        for data in X:
            list_samples = [
                model.neurons[model.index_bmu(np.array([data]))[0]][model.index_bmu(np.array([data]))[1]]
                for model in self.models
            ]
            #print(self.models)
            # Calculate distances and sum
            distances = [math.exp(1 / (euc_distance(data, sample) + 1)) for sample in list_samples]
            dist_sum = sum(distances)
            
            # Normalize distances
            normalized_distances = [dist / dist_sum for dist in distances]
            
            predictions.append(normalized_distances)
        
        return np.array(predictions)
    
    def fit(self, epoch: int=10):
        # find the total classes
        
        if not self.trained:
            self.trained = True
            self.models = []
            for data_points, label in self.dataset:
                som = SOM(m=self.neuron_shape[0], n=self.neuron_shape[1], dim=X.shape[1], 
                        initiate_method='SOM++', learning_rate=1.5, 
                        neighbour_rad=2.0, distance_function='euclidean', max_iter=None)
                som.fit(X = data_points, epoch=epoch, verbose=False)
                self.models.append(som)
        else:
            for datasets, som_model in zip(self.dataset, self.models):
                data_points, label = datasets
                som_model.fit(X=data_points, epoch=epoch, verbose=False)
        
    def eval(self):
        pred = self.predict(self.all_dataset)
        #print(pred)
        mean_squared_error = np.mean(np.sum((self.true_encoded - pred)**2, axis=1))
        accuracy = np.sum([1 if self.classes[np.argmax(pred_data)] == true_data else 0 for pred_data, true_data in zip(pred, self.true)])/len(pred)
        return (mean_squared_error, accuracy)
    
    def train(self, retry:int=10, train_batch:int=24):
        self.fit(1)
        best_mse = 1e10
        best_acc = 0
        while True and retry > 0:
            mse, acc = self.eval()
            self.fit(train_batch)
            print(mse, acc)
            if mse < best_mse or acc > best_acc:
                if mse < best_mse:
                    best_mse = mse
                if acc > best_acc:
                    best_acc = acc
                self.best_models = self.models
                retry = 10
            else:
                retry -= 1
        self.models = self.best_models
        return self.eval()

In [132]:
import numpy as np
import math

def normalize_column(data, column_index):
    column = data[:, column_index]
    min_val = np.min(column)
    max_val = np.max(column)
    normalized_column = (column - min_val) / (max_val - min_val)
    return normalized_column

def one_hot_encode(y):
    classes = np.unique(y)
    encoded = np.zeros((y.size, classes.size))
    for idx, label in enumerate(y):
        encoded[idx, np.where(classes == label)[0][0]] = 1
    return encoded

class som_classification:
    def __init__(self, m: int, n: int, X: np.array, y: np.array) -> None:
        # Normalize each column
        for col in range(X.shape[1]):
            X[:, col] = normalize_column(X, col)
        
        y = np.reshape(y, -1)
        self.total_neurons_representation = m * n
        self.neuron_shape = (m, n)
        self.classes = np.unique(y)
        self.dataset = []
        self.all_dataset = X
        self.true = y
        self.true_encoded = one_hot_encode(y)
        self.best_models = None
        self.trained = False
        self.weights = np.random.rand(len(self.classes)).astype(np.float64)
        for c in self.classes:
            X_filtered = X[(y == c)]
            self.dataset.append((X_filtered, c))
        # Initialize weights for the models
        self.weights = np.ones(len(self.classes))

    def predict(self, X: np.array):
        predictions = []

        for data in X:
            list_samples = [
                model.neurons[model.index_bmu(np.array([data]))[0]][model.index_bmu(np.array([data]))[1]]
                for model in self.models
            ]
            distances = [math.exp(1 / (euc_distance(data, sample) + 1)) for sample in list_samples]
            dist_sum = sum(distances)
            normalized_distances = np.array([dist / dist_sum for dist in distances])
            # Apply weights to normalized distances
            weighted_distances = [w * d for w, d in zip(self.weights, normalized_distances)]
            predictions.append(weighted_distances)
        
        return np.array(predictions)
    
    def update_weights(self, learning_rate=0.01, iterations=100):
        for _ in range(iterations):
            # Calculate predictions
            pred = self.predict(self.all_dataset)
            error = self.true_encoded - pred

            # Calculate gradient
            gradient = np.zeros_like(self.weights)
            for i, (p, t) in enumerate(zip(pred, self.true_encoded)):
                for j in range(len(self.weights)):
                    gradient[j] += -2 * error[i][j] * p[j]

            # Update weights
            self.weights -= learning_rate * gradient
        
        # Normalize weights to sum to 1 (optional, depending on your requirements)
        self.weights = self.weights / np.sum(self.weights)
    def fit(self, epoch: int = 10, initiate_method = 'kde', learning_rate=1.5, distance_function = "euclidean"):
        if not self.trained:
            self.trained = True
            self.models = []
            for data_points, label in self.dataset:
                som = SOM(m=self.neuron_shape[0], n=self.neuron_shape[1], dim=self.all_dataset.shape[1], 
                          initiate_method=initiate_method, learning_rate=learning_rate, 
                          neighbour_rad=2.0, distance_function=distance_function, max_iter=None)
                som.fit(X=data_points, epoch=epoch, verbose=False)
                self.models.append(som)
        else:
            for datasets, som_model in zip(self.dataset, self.models):
                data_points, label = datasets
                som_model.fit(X=data_points, epoch=epoch, verbose=False)
        # Gradient descent to optimize weights
        
        # Gradient descent to optimize weights
        self.update_weights( iterations=epoch)
        
    def eval(self):
        pred = self.predict(self.all_dataset)
        mean_squared_error = np.mean(np.sum((self.true_encoded - pred) ** 2, axis=1))
        accuracy = np.sum([1 if self.classes[np.argmax(pred_data)] == true_data else 0 for pred_data, true_data in zip(pred, self.true)]) / len(pred)
        return mean_squared_error, accuracy
    
    def train(self, retry: int = 10, train_batch: int = 24):
        self.fit(1)
        best_mse = float('inf')
        best_acc = 0
        while retry > 0:
            mse, acc = self.eval()
            self.fit(train_batch)
            print(mse, acc)
            if mse < best_mse or acc > best_acc:
                if mse < best_mse:
                    best_mse = mse
                if acc > best_acc:
                    best_acc = acc
                self.best_models = self.models
                retry = 10
            else:
                retry -= 1
        self.models = self.best_models
        return self.eval(), (best_mse, best_acc)


In [133]:
som_class = som_classification(2,2, X, y)
#print(som_class.all_dataset)
eval, best = som_class.train(retry=16, train_batch=128)
eval, best

0.8086177128507226 0.3333333333333333
0.8049070224036231 0.6333333333333333
0.7936725000983668 0.7733333333333333
0.7840864858546823 0.8266666666666667
0.7765698158479089 0.8866666666666667
0.774912544893786 0.86
0.7754221660856325 0.8933333333333333
0.7768519694573246 0.88
0.7772944440193776 0.8666666666666667
0.7786481021810022 0.8933333333333333
0.7787539278192435 0.8866666666666667
0.7775933008965169 0.9
0.7813669862669346 0.8933333333333333
0.779090104822294 0.8666666666666667
0.7801740327365236 0.8666666666666667
0.7795755066067299 0.8733333333333333
0.78159088981332 0.88
0.7807210031533034 0.9066666666666666
0.7814660805643204 0.8866666666666667
0.7810690571659978 0.9133333333333333
0.7805321032067873 0.9066666666666666
0.7801789733273995 0.9066666666666666
0.7807720544722719 0.8866666666666667
0.7797154906162814 0.8933333333333333
0.7787686314641731 0.8933333333333333
0.7775278328052897 0.8866666666666667
0.7773558810500873 0.8933333333333333
0.7756061814866255 0.88666666666666

((0.7718605527125989, 0.94), (0.7709163327647971, 0.94))