# LVQ implementation

## Load data

In [1]:
# Code to read file into Colaboratory:!pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials# Authenticate and create the PyDrive client.
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

In [2]:
data_set_id = '1fL7_D_ISApzuFQcW5hNESuleCvWbdotq'
downloaded = drive.CreateFile({'id':data_set_id})
downloaded.GetContentFile('data set.txt')

In [3]:
import numpy as np

In [233]:
data_set = []
label_set = []

f = open("data set.txt", "r")
for line in f.readlines():
  temp = []
  for item in (line.split(",")):
    temp.append(float(item))
  data_set.append(temp[:-1])
  label_set.append(int(temp[-1]))

data_set = np.array(data_set)
label_set = np.array(label_set)
N = len(data_set)
print(N)

1372


## Pre-process data

In [234]:
data_set_normalized = (data_set - np.min(data_set)) / (np.max(data_set) - np.min(data_set))

In [235]:
data_set_normalized

array([[0.54872005, 0.70785003, 0.34591883, 0.42037539],
       [0.57787732, 0.69211842, 0.35691866, 0.3883535 ],
       [0.55642971, 0.35124998, 0.49517515, 0.43783379],
       ...,
       [0.31617167, 0.00992098, 0.98945758, 0.3468715 ],
       [0.32205801, 0.17004148, 0.825416  , 0.39402533],
       [0.35429094, 0.41371776, 0.51914954, 0.47217867]])

In [236]:
# Shuffle data
data = list(zip(data_set_normalized.tolist(), label_set.tolist()))
np.random.shuffle(data)
data = list(zip(*data))
data_set_normalized, label_set = np.array(data[0]), np.array(data[1])

In [247]:
label0_indexes = np.array(np.where(label_set == 0)[0])
label1_indexes = np.array(np.where(label_set == 1)[0])

In [248]:
class0_number = len(label0_indexes)
class1_number = len(label1_indexes)

In [249]:
print("Class zero number:", class0_number)
print("Class one number:", class1_number)

Class zero number: 762
Class one number: 610


In [252]:
class0_training_size = int(class0_number * 0.9)
class1_training_size = int(class1_number * 0.9)
x_train = []
y_train = []
x_val = []
y_val = []
for i in label0_indexes[:class0_training_size]:
  x_train.append(data_set_normalized[i])
for i in label1_indexes[:class1_training_size]:
  x_train.append(data_set_normalized[i])

for i in label0_indexes[class0_training_size:]:
  x_val.append(data_set_normalized[i])
for i in label1_indexes[:class1_training_size]:
  x_val.append(data_set_normalized[i])

x_train = np.array(x_train)
x_val = np.array(x_val)
y_train = np.array(class0_training_size * [0] + class1_training_size * [1])
y_val = np.array((class0_number - class0_training_size) * [0] + (class1_number - class1_training_size) * [1])

In [253]:
# Shuffle data
data = list(zip(x_train.tolist(), y_train.tolist()))
np.random.shuffle(data)
data = list(zip(*data))
x_train, y_train = np.array(data[0]), np.array(data[1])

data = list(zip(x_val.tolist(), y_val.tolist()))
np.random.shuffle(data)
data = list(zip(*data))
x_val, y_val = np.array(data[0]), np.array(data[1])

## LVQ1

In [406]:
class LVQ1:

  def __init__(self, n_weights, lr=0.03):
    self.lr = lr
    self.n_updates_max = 0 # For decrease learning rate
    self.n_updates = 0 # For decrease learning rate
    self.n_correct_prediction = 0 # For accuracy
    self.n_weights = n_weights # n_weights is even
    self.weights = []

  def winner(self, x, y): # Find winner weights
    x = np.expand_dims(x, axis=0)
    distance = np.linalg.norm(x - self.weights, axis=1)
    return int(distance.argmin())
  
  def update(self, x, y, J): # Update weights according to conditions applied
    self.n_updates += 1
    if y == J:
      self.weights[J] = self.weights[J] + self.lr * (x - self.weights[J])
    else:
      self.weights[J] = self.weights[J] - self.lr * (x - self.weights[J])
    self.lr = self.lr * (1 - self.n_updates / self.n_updates_max)
  
  def choose_weights(self, x_train): # Here we have two classes, then we choose half from class0 and half from class1 as LVQ weights.
    n_train_0 = x_train[y_train == 0].shape[0]
    n_train_1 = x_train[y_train == 1].shape[0]
    x_train_0 = x_train[y_train == 0]
    x_train_1 = x_train[y_train == 1]
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_0[int(i * n_train_0 / (self.n_weights / 2))])
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_1[int(i * n_train_1 / (self.n_weights / 2))])
    self.weights = np.array(self.weights)
  
  def train(self, x_train, y_train, x_val, y_val, epochs):
    self.choose_weights(x_train)
    n_train = x_train.shape[0]
    self.n_updates_max = n_train * epochs
    for e in range(epochs):
      self.n_correct_prediction = 0
      for i in range(n_train):
        x, y = x_train[i], y_train[i]
        J = self.winner(x, y)
        J = int(J > len(self.weights) / 2)
        if y == J:
          self.n_correct_prediction += 1
        self.update(x, y, J)
      
      train_acc = (self.n_correct_prediction / n_train) * 100
      print("training acc: {:.2f} %".format(train_acc))
      self.evaluate(x_val, y_val)
      
  def evaluate(self, x_val, y_val):
    n_val = x_val.shape[0]
    self.n_correct_prediction = 0
    for i in range(n_val):
      x, y = x_val[i], y_val[i]
      J = self.winner(x, y)
      J = int(J > len(self.weights) / 2)
      if y == J:
        self.n_correct_prediction += 1
    
    val_acc = (self.n_correct_prediction / n_val) * 100
    print("val acc: {:.2f} %".format(val_acc))


In [418]:
lvq1_model = LVQ1(60) # 60 weights
lvq1_model.train(x_train=x_train, y_train=y_train, x_val=x_val, y_val=y_val, epochs=8)

training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %
training acc: 95.06 %
val acc: 96.38 %


## LVQ2.1

In [419]:
class LVQ2_1:

  def __init__(self, n_weights, epsilon=0.1, lr=0.01):
    self.lr = lr
    self.epsilon = epsilon
    self.update_flag = False
    self.n_updates_max = 0 # For decrease learning rate
    self.n_updates = 0 # For decrease learning rate
    self.n_correct_prediction = 0 # For accuracy
    self.n_weights = n_weights # n_weights is even
    self.weights = []
  
  def winner(self, x, y): # Find winner weights and check for update if required according to LVQ 2.1 algorithm
    x = np.expand_dims(x, axis=0)
    distance = np.linalg.norm(x - self.weights, axis=1)
    sorted_argumets = distance.argsort()
    winner_weights = sorted_argumets[:2]
    winner_distance, runner_up_distance = distance[winner_weights]

    if runner_up_distance == 0 or winner_distance == 0: # Prevent from division by zero
      self.update_flag = False
      return winner_weights
    if min(winner_distance / runner_up_distance, runner_up_distance / winner_distance) > (1 - self.epsilon) and\
    max(winner_distance / runner_up_distance, runner_up_distance / winner_distance) < (1 + self.epsilon):
      self.update_flag = True
    else:
      self.update_flag = False
    return winner_weights
  
  def update(self, x, y, first_winner, second_winner):# Update weights according to conditions applied
    if self.update_flag:
      if (y == first_winner and y != second_winner) or (y == second_winner and y != first_winner): # One of them from correct class and another does not
        self.n_updates += 1
        # do not care whether is closer to correct class or to incorrect class
        self.weights[first_winner] = self.weights[first_winner] + self.lr * (x - self.weights[first_winner])
        self.weights[second_winner] = self.weights[second_winner] - self.lr * (x - self.weights[second_winner])
        self.lr = self.lr * (1 - self.n_updates / self.n_updates_max)
  
  def choose_weights(self, x_train): # Here we have two classes, then we choose half from class0 and half from class1 as LVQ weights.
    n_train_0 = x_train[y_train == 0].shape[0]
    n_train_1 = x_train[y_train == 1].shape[0]
    x_train_0 = x_train[y_train == 0]
    x_train_1 = x_train[y_train == 1]
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_0[int(i * n_train_0 / (self.n_weights / 2))])
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_1[int(i * n_train_1 / (self.n_weights / 2))])
    self.weights = np.array(self.weights)
  
  def train(self, x_train, y_train, x_val, y_val, epochs):
    self.choose_weights(x_train)
    n_train = x_train.shape[0]
    self.n_updates_max = n_train * epochs
    for e in range(epochs):
      self.n_correct_prediction = 0
      for i in range(n_train):
        x, y = x_train[i], y_train[i]
        winner_weights = self.winner(x, y)
        first_winner = int(winner_weights[0] > len(self.weights) / 2)
        second_winner = int(winner_weights[1] > len(self.weights) / 2)
        if y == first_winner:
          self.n_correct_prediction += 1
        self.update(x, y, first_winner, second_winner)
      
      train_acc = (self.n_correct_prediction / n_train) * 100
      print("training acc: {:.2f} %".format(train_acc))
      self.evaluate(x_val, y_val)
      
  def evaluate(self, x_val, y_val):
    n_val = x_val.shape[0]
    self.n_correct_prediction = 0
    for i in range(n_val):
      x, y = x_val[i], y_val[i]
      winner_weights = self.winner(x, y)
      first_winner = int(winner_weights[0] > len(self.weights) / 2)
      second_winner = int(winner_weights[1] > len(self.weights) / 2)
      if y == first_winner:
        self.n_correct_prediction += 1
    
    val_acc = (self.n_correct_prediction / n_val) * 100
    print("val acc: {:.2f} %".format(val_acc))
  

In [420]:
lvq2_1_model = LVQ2_1(60) # 60 weights and small value for learning rate and relatively low epochs
lvq2_1_model.train(x_train=x_train, y_train=y_train, x_val=x_val, y_val=y_val, epochs=5) 

training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %


## LVQ3

In [410]:
class LVQ3:

  def __init__(self, n_weights, epsilon=0.2, lr=0.03):
    self.lr = lr
    self.epsilon = epsilon
    self.update_flag = False
    self.n_updates_max = 0 # For decrease learning rate
    self.n_updates = 0 # For decrease learning rate
    self.n_correct_prediction = 0 # For accuracy
    self.m = np.random.uniform(0.1, 0.5)
    self.n_weights = n_weights # n_weights is even
    self.weights = []
  
  def winner(self, x, y): # Find winner weights and check for update if required according to LVQ 3 algorithm
    x = np.expand_dims(x, axis=0)
    distance = np.linalg.norm(x - self.weights, axis=1)
    sorted_argumets = distance.argsort()
    winner_weights = sorted_argumets[:2]
    winner_distance, runner_up_distance = distance[winner_weights]

    if runner_up_distance == 0 or winner_distance == 0: # Prevent from division by zero
      self.update_flag = False
      return winner_weights
    if min(winner_distance / runner_up_distance, runner_up_distance / winner_distance) > (1 - self.epsilon) * (1 + self.epsilon):
      self.update_flag = True
    else:
      self.update_flag = False
    return winner_weights
  
  def update(self, x, y, first_winner, second_winner):# Update weights according to conditions applied
    if self.update_flag:
      self.n_updates += 1
      self.lr = self.lr * (1 - self.n_updates / self.n_updates_max)
      if y == first_winner and y == second_winner:
        self.beta = self.m * self.lr
        self.weights[first_winner] = self.weights[first_winner] + self.beta * (x - self.weights[first_winner])
        self.weights[second_winner] = self.weights[second_winner] + self.beta * (x - self.weights[second_winner])
        return
      if y == first_winner:
        self.weights[first_winner] = self.weights[first_winner] + self.lr * (x - self.weights[first_winner])
        self.weights[second_winner] = self.weights[second_winner] - self.lr * (x - self.weights[second_winner])
  
  def choose_weights(self, x_train): # Here we have two classes, then we choose half from class0 and half from class1 as LVQ weights.
    n_train_0 = x_train[y_train == 0].shape[0]
    n_train_1 = x_train[y_train == 1].shape[0]
    x_train_0 = x_train[y_train == 0]
    x_train_1 = x_train[y_train == 1]
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_0[int(i * n_train_0 / (self.n_weights / 2))])
    for i in range(self.n_weights // 2):
      self.weights.append(x_train_1[int(i * n_train_1 / (self.n_weights / 2))])
    self.weights = np.array(self.weights)
  
  def train(self, x_train, y_train, x_val, y_val, epochs):
    self.choose_weights(x_train)
    n_train = x_train.shape[0]
    self.n_updates_max = n_train * epochs
    for e in range(epochs):
      self.n_correct_prediction = 0
      for i in range(n_train):
        x, y = x_train[i], y_train[i]
        winner_weights = self.winner(x, y)
        first_winner = int(winner_weights[0] > len(self.weights) / 2)
        second_winner = int(winner_weights[1] > len(self.weights) / 2)
        if y == first_winner:
          self.n_correct_prediction += 1
        self.update(x, y, first_winner, second_winner)
      
      train_acc = (self.n_correct_prediction / n_train) * 100
      print("training acc: {:.2f} %".format(train_acc))
      self.evaluate(x_val, y_val)
      
  def evaluate(self, x_val, y_val):
    n_val = x_val.shape[0]
    self.n_correct_prediction = 0
    for i in range(n_val):
      x, y = x_val[i], y_val[i]
      winner_weights = self.winner(x, y)
      first_winner = int(winner_weights[0] > len(self.weights) / 2)
      second_winner = int(winner_weights[1] > len(self.weights) / 2)
      if y == first_winner:
        self.n_correct_prediction += 1
    
    val_acc = (self.n_correct_prediction / n_val) * 100
    print("val acc: {:.2f} %".format(val_acc))
  

In [417]:
lvq3_model = LVQ3(60) # 60 weights
lvq3_model.train(x_train=x_train, y_train=y_train, x_val=x_val, y_val=y_val, epochs=8)

training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %
training acc: 94.98 %
val acc: 96.38 %


[reffrence for LVQ algorithm](http://ccy.dd.ncu.edu.tw/~chen/course/Neural/ch4/index.htm)