In [1]:
import numpy as np
from sklearn.metrics import jaccard_score
from concurrent.futures import ThreadPoolExecutor
from tensorflow.keras.datasets import cifar10
from sklearn.preprocessing import MinMaxScaler
from numba import jit, prange
from sklearn.decomposition import PCA

In [2]:
@jit(nopython=True)
def initialize_weights(x, y, input_len):
    return np.random.rand(x, y, input_len)

@jit(nopython=True)
def jaccard_distance(sample1, sample2):
    intersection = np.sum(np.minimum(sample1, sample2))
    union = np.sum(np.maximum(sample1, sample2)) + 1e-10  # Add a small value to avoid division by zero
    return 1 - intersection / union

@jit(nopython=True)
def find_bmu(sample, weights):
    x, y, input_len = weights.shape
    min_dist = np.inf
    bmu = (0, 0)
    for i in range(x):
        for j in range(y):
            dist = jaccard_distance(sample, weights[i, j, :])
            if dist < min_dist:
                min_dist = dist
                bmu = (i, j)
    return bmu

@jit(nopython=True)
def neighborhood(center, radius, x, y):
    influence = np.zeros((x, y))
    for i in range(x):
        for j in range(y):
            distance = np.sqrt((i - center[0]) ** 2 + (j - center[1]) ** 2)
            influence[i, j] = np.exp(-distance / (2 * (radius ** 2)))
    return influence

@jit(nopython=True)
def update_weights(sample, weights, bmu, radius, learning_rate):
    x, y, input_len = weights.shape
    influence = neighborhood(bmu, radius, x, y)
    for i in range(x):
        for j in range(y):
            weights[i, j, :] += influence[i, j] * learning_rate * (sample - weights[i, j, :])

def train_som(data, x, y, input_len, sigma, learning_rate, num_iterations):
    weights = initialize_weights(x, y, input_len)
    initial_learning_rate = learning_rate
    epsilon = 1e-10  # Small value to prevent division by zero

    for iteration in range(num_iterations):
        radius = sigma * np.exp(-iteration / (num_iterations / (np.log(sigma) + epsilon)))
        learning_rate = initial_learning_rate * np.exp(-iteration / (num_iterations + epsilon))
        
        with ThreadPoolExecutor() as executor:
            futures = [executor.submit(process_sample, sample, weights, radius, learning_rate) for sample in data]
            for future in futures:
                weights = future.result()
                
        if (iteration % 10 == 0):
            print(f"iteration : {iteration}")
    return weights

def process_sample(sample, weights, radius, learning_rate):
    bmu = find_bmu(sample, weights)
    update_weights(sample, weights, bmu, radius, learning_rate)
    return weights

def transform_som(data, weights):
    transformed = np.zeros((data.shape[0], 2))
    for i in prange(data.shape[0]):
        transformed[i] = find_bmu(data[i], weights)
    return transformed


In [3]:
# Load CIFAR-10 dataset
(x_train, y_train), (x_test, y_test) = cifar10.load_data()

# Preprocess the data
x_train = x_train.reshape(-1, 32 * 32 * 3)
x_test = x_test.reshape(-1, 32 * 32 * 3)

# Select 3000 samples from the training set
num_samples = 3000
x_train = x_train[:num_samples]
y_train = y_train[:num_samples]

x_test = x_test[:500]
y_test = y_test[:500]

scaler = MinMaxScaler()
x_train = scaler.fit_transform(x_train)
x_test = scaler.transform(x_test)

# Apply PCA to reduce dimensionality
pca = PCA(n_components=50)  # Reduce to 50 components
x_train_pca = pca.fit_transform(x_train)
x_test_pca = pca.transform(x_test)

In [4]:
# SOM parameters
x = 10
y = 10
input_len = 32 * 32 * 3
sigma = 1.0
learning_rate = 0.5
num_iterations = 100

In [5]:
# Train SOM
weights = train_som(x_train, x, y, input_len, sigma, learning_rate, num_iterations)

# Transform data
transformed_train = transform_som(x_train, weights)

print("Transformed Train Data:", transformed_train)

iteration : 0
iteration : 10
iteration : 20
iteration : 30
iteration : 40
iteration : 50
iteration : 60
iteration : 70
iteration : 80
iteration : 90
Transformed Train Data: [[4. 8.]
 [3. 0.]
 [6. 0.]
 ...
 [0. 9.]
 [0. 2.]
 [0. 0.]]


In [6]:
def predict(data, weights, train_labels, x, y):
    bmu_indices = transform_som(data, weights)
    predicted_labels = np.zeros(bmu_indices.shape[0], dtype=train_labels.dtype)
    
    for idx, (i, j) in enumerate(bmu_indices):
        closest_samples = []
        for k in range(train_labels.shape[0]):  # Iterate over the number of training samples
            train_bmu = find_bmu(x_train[k], weights)  # Use x_train[k] instead of data[k]
            if train_bmu == (i, j):
                closest_samples.append(train_labels[k])
        if closest_samples:
            closest_samples_flat = np.array(closest_samples).flatten()  # Flatten the list
            predicted_labels[idx] = np.bincount(closest_samples_flat).argmax()
        else:
            predicted_labels[idx] = -1  # Assign a default label if no samples are found
    
    return predicted_labels


In [None]:
# Predict on test data
predicted_labels = predict(x_test, weights, y_train, x, y)

In [None]:
def accuracy(predicted_labels, true_labels):
    return np.mean(predicted_labels == true_labels)

In [None]:
# Evaluate accuracy
test_accuracy = accuracy(predicted_labels, y_test.flatten())
print("Test Accuracy:", test_accuracy)