In [17]:
from math import sqrt
from random import randrange, seed
from csv import reader


seed(42)


def load_csv(filepath):
  dataset = []

  with open(filepath, "r") as file:
    csv_reader = reader(file)
    for row in csv_reader:
      if row:
        dataset.append(row)
  
  return dataset


def str_col_to_float(dataset, col):
  for row in dataset:
    row[col] = float(row[col].strip())


def str_col_to_int(dataset, col):
  class_vals = set(list(map(lambda row: row[-1], dataset)))
  class_lookup = {}

  for i, val in enumerate(class_vals):
    class_lookup[val] = i
  for row in dataset:
    row[col] = class_lookup[row[col]]
  
  return class_lookup


def cross_validation_split(dataset, n_folds):
  dataset_folds = []
  fold_size = int(len(dataset) / n_folds)
  dataset_ = dataset.copy()

  for _ in range(n_folds):
    fold = []
    while len(fold) < fold_size:
      fold.append(dataset_.pop(randrange(len(dataset_))))
    dataset_folds.append(fold)
  
  return dataset_folds


def accuracy_metric(actual, predicted):
  assert len(actual) == len(predicted)

  size = len(actual)
  correct_count = 0

  for i in range(size):
    correct_count += 1 if actual[i] == predicted[i] else 0

  return (correct_count / float(size)) * 100


def evaluate_algorithm(dataset, algorithm, n_folds, *args):
  folds = cross_validation_split(dataset=dataset, n_folds=n_folds)
  scores = []

  for fold in folds:
    train = folds.copy()
    train.remove(fold)
    train = sum(train, [])
    test = []
    for row in fold:
      row_ = row.copy()
      row_[-1] = None
      test.append(row_)
    predicted = algorithm(train, test, *args)
    actual = list(map(lambda row: row[-1], fold))
    accuracy = accuracy_metric(actual=actual, predicted=predicted)
    scores.append(accuracy)
  
  return scores


def euclidean_distance(row1, row2):
  assert len(row1) == len(row2)

  distance = 0.0

  for i in range(len(row1) - 1):
    distance += pow(row1[i] - row2[i], 2)
  
  return sqrt(distance)


def get_best_matching_unit(codebooks, row):
  distances = []

  for codebook in codebooks:
    dist = euclidean_distance(row1=codebook, row2=row)
    distances.append((codebook, dist))
  distances.sort(key=lambda tup: tup[1])

  return distances[0][0]


def predict(codebooks, row):
  bmu = get_best_matching_unit(codebooks=codebooks, row=row)
  return bmu[-1]


def random_codebook(train):
  n_records = len(train)
  n_features = len(train[0])

  codebook = [train[randrange(n_records)][i] for i in range(n_features)]

  return codebook


def train_codebooks(train, n_codebooks, l_rate, n_epochs):
  codebooks = [random_codebook(train=train) for _ in range(n_codebooks)]

  for epoch in range(n_epochs):
    rate = l_rate * (1 - (epoch / float(n_epochs)))
    error_sum = 0.0
    for row in train:
      bmu = get_best_matching_unit(codebooks=codebooks, row=row)
      for i in range(len(row) - 1):
        error = row[i] - bmu[i]
        error_sum += pow(error, 2)
        if bmu[-1] == row[-1]:
          bmu[i] += rate * error
        else:
          bmu[i] -= rate * error
    # print(f"epoch={epoch}; l_rate={l_rate}; error={error_sum:.2f}")
  
  return codebooks


def learning_vector_quantization(train, test, n_codebooks, l_rate, n_epochs):
  codebooks = train_codebooks(train=train, n_codebooks=n_codebooks, l_rate=l_rate, n_epochs=n_epochs)
  predictions = []

  for row in test:
    predictions.append(predict(codebooks=codebooks, row=row))
  
  return predictions


filepath = "../datasets/ionosphere.csv"
dataset = load_csv(filepath=filepath)
for i in range(len(dataset[0]) - 1):
  str_col_to_float(dataset=dataset, col=i)
str_col_to_int(dataset=dataset, col=len(dataset[0]) - 1)
l_rate = 0.3
n_folds = 5
n_epochs = 50
n_codebooks = 20
scores = evaluate_algorithm(dataset, learning_vector_quantization, n_folds, n_codebooks, l_rate, n_epochs)
display(f"scores={scores}")
display(f"mean accuracy={sum(scores) / len(scores)}")