# Descobrindo o LVQ em Python

Um dos principais pontos fracos (ou desvantagens) do kNN é que ele te obriga a depender do seu conjunto de dados **inteiro**. Isso porque para fornecer uma classificação para uma instância desconhecida, ele a compara com **todas** as instâncias conhecidas.

O  algoritmo *Learning Vector Quantization* (LVQ) é como se fosse uma solução para esse problema. Ele é parecido com o kNN, porém com ele você pode escolher o tamanho do conjunto de dados que você será "obrigado" a depender. Isso porque o LVQ é um método de aprendizado baseado em "protótipo": um ou mais "protótipos" ou vetores são utilizados para representar cada classe no dataset, e cada protótipo é descrito como um ponto no espaço de amostras.

Ele ainda realiza sua classificação a partir de uma comparação com instâncias conhecidas, assim como o  kNN. A diferença é que essa comparação não precisa ser feita com todo o conjunto de dados. Apenas aquelas instâncias que melhor resumem o conjunto (os protótipos) são levadas em consideração!

Existem diversas variações desse algoritmo. No corpo deste trabalho, iremos estudar o LVQ como algoritmo para classificação (aprendizado supervisionado).

# Pegando a intuição

Se tivermos uma quantidade muito gigantesca de dados, rodar um programa que explora o conjunto inteiro pode ser muito caro, como no caso do kNN.

O ideal seria diminuir o tamanho desse conjunto de dados, ou fazer um resumo dele. E isso pode ser feito pegando as instâncias que melhor resumem, ou melhor representam, o restante do conjunto.

Considere a imagem abaixo. Nela, podemos ver uma distribuição de várias instâncias de um conjunto de dados qualquer, com três classes: vermelho, verde e azul. Se fôssemos aplicar o kNN com um valor de k igual a 1 (por exemplo), é assim que o modelo faria suas classificações:

![i](src/lvq.png)
*Fonte: wikipedia*

Dado um ponto novo e desconhecido, bastaria posicioná-lo na imagem acima e ver qual a cor da região em que ele se encaixa. Essa seria a sua classificação.

Agora, perceba que em cada região dessas, existem vários pontos da mesma classe. Por exemplo, na região azul mais central, tem vários pontos azuis, assim como a região vermelha de cima tem vários pontos vermelhos.

A sacada do LVQ é pegar somente um ou alguns pontos que fiquem bem no centro dessas regiões, como se fizesse um resumo dessa quantidade tão grande de pontos. Na imagem abaixo, esses pontos representantes são os círculos: 

![img](src/lvqresumindo.gif)
*Fonte: [Willamette University](https://www.willamette.edu/~gorr/classes/cs449/Unsupervised/competitive.html)*


O resultado é um espaço subdividido em regiões conhecido como **Tesselação de voronoi**, conforme a figura abaixo:

![img](src/voronoi.gif)
*Fonte: [Willamette University](https://www.willamette.edu/~gorr/classes/cs449/Unsupervised/competitive.html)*


Feito isso, basta usar esses pontos que são representativos do todo, e aplicar o kNN considerando-os como o novo e resumido conjunto de dados!

# Implementando

A representação para o LVQ é uma coleção específica de instâncias, que serão **parecidas** com as instâncias presentes no seu conjunto de dados. Aos elementos dessa coleção, daremos o nome de **vetores**.Esses vetores são aprendidos ou descobertos a partir do conjunto de dados de treino. 

Nessa implementação, iremos aprender a partir de um conjunto de dados produzido com o sinal de 16 antenas de alta frequência (radares). O alvo dessas antenas são os elétrons livres na ionosfera (parte superior da atmosfera, acima da estratosfera). Sinais "bons" de radar são aqueles que exibem evidência da presença de algum tipo de estrutura na ionosfera. Sinais "ruins" não. Seus sinais passam direto através da ionosfera. [Mais detalhes](https://archive.ics.uci.edu/ml/datasets/Ionosphere)

Resumindo: esses dados que usaremos trazem informações a respeito da estrutura da ionosfera de acordo com o retorno dos radares. Cada instância descreve as propriedades do retorno do radar, e nosso objetivo será predizer se tem ou não tem algum tipo de estrutura na ionosfera.

São 351 registros no total. Cada um é composto por 34 valores numéricos, e mais um valor que pode ser "g" para sinal bom (good), e "b" para sinal ruim (bad).

### Medida de similaridade: distância euclidiana

Para descobrir o quanto um dado vetor é bom em representar os demais, precisaremos de uma medida de similaridade: o quanto esse vetor está próximo dos outros. Para isso, usamos a distância euclidiana, que pode ser usada para espaços de qualquer dimensão:

In [1]:
from math import sqrt
def distancia_euclidiana(linha1, linha2):
	distancia = 0.0
	for i in range(len(linha1)-1):
		distancia += (linha1[i] - linha2[i])**2
	return sqrt(distancia)

Note que essa função assume que a última coluna de cada linha é a classificação ("g" ou "b"), que deve ser ignorada para o cálculo da distância

Agora, podemos usar esse cálculo da distância para localizar o vetor que mais se aproxima de um novo dado. Para fazer isso, calculamos a distância entre cada vetor e o novo dado (usando a função acima).

Uma vez que as distâncias forem calculadas, podemos fazer uma ordenação para pegar aquele vetor que tiver a menor distância. Abaixo, o código implementa isso:

In [2]:
def pegar_vetor_mais_proximo(vetores, instancia_teste):
	distancias = list()
	for vetor in vetores:
		dist = distancia_euclidiana(vetor, instancia_teste)
		distancias.append((vetor, dist))
	distancias.sort(key=lambda tup: tup[1])
	return distancias[0][0]

Podemos testar essa função com um pequeno conjunto de dados inventado:

In [3]:
conjunto_dados = [[2.7810836,2.550537003,0],
	[1.465489372,2.362125076,0],
	[3.396561688,4.400293529,0],
	[1.38807019,1.850220317,0],
	[3.06407232,3.005305973,0],
	[7.627531214,2.759262235,1],
	[5.332441248,2.088626775,1],
	[6.922596716,1.77106367,1],
	[8.675418651,-0.242068655,1],
	[7.673756466,3.508563011,1]]
instancia_teste = conjunto_dados[0]
vetor = pegar_vetor_mais_proximo(conjunto_dados, instancia_teste)
print(vetor)

[2.7810836, 2.550537003, 0]


Rodando esse exemplo, vemos o vetor mais próximo ao primeiro registro do conjunto. Como esperado, o primeiro registro é o mais próximo dele mesmo!

Agora que já sabemos como encontrar o vetor mais próximo, temos que aprender como faz para encontrar quais são os vetores que melhor resumem o restante do conjunto.

### Treinando os vetores

O primeiro passo para o treinamento é inicializar o conjunto de vetores. Uma maneira de fazer isso é pegar valores aleatórios que estão presentes no nosso conjunto de dados, e ir montando os vetores assim.

O código abaixo recebe o conjunto de dados de treino, e pega colunas e linhas aleatórias para montar um vetor completamente aleatório (mas com valores encontrado nos dados reais):

In [4]:
def vetor_aleatorio(dados):
	qtd_registros = len(dados)
	qtd_colunas = len(dados[0])
	vetor = [dados[randrange(qtd_registros)][i] for i in range(qtd_colunas)]
	return vetor

Uma vez que inicializamos os vetores aleatórios, devemos ajustá-los para que eles sejam uma coisa que faz mais sentido, de um jeito que sejam uma boa representação do restante do conjunto. E isso será feito iterativamente:

1. Epochs ou Épocas: De maneira geral, o processo vai ser repetido um número fixo de vezes. Em cada época, os vetores são expostos ao conjunto de treino e adaptados a ele.
2. Conjunto de treino: Dentro de uma época, cada instância do nosso conjunto de dados é usado, um de cada vez, para ajustar o valor dos nossos vetores.

Esse ajuste é feito da seguinte forma:

1. Se o vetor é da mesma classe da instância da vez, então empurramos o vetor para mais perto da instância
2. Se o vetor **não** é da mesma classe da instância da vez, então empurramos o vetor para mais **longe** da instância

Para cada instância dentro do conjunto de dados, nós estudamos apenas o vetor mais próximo. E ele, e somente ele, vai ser ajustado com base nessa instância.

Além disso, nós podemos usar a diferença entre esse vetor e essa instância do conjunto de dados como uma métrica para saber qual o tamanho do erro do nosso algoritmo. É esse erro que usamos para decidir quanto vamos empurrar o nosso vetor, para mais perto ou para mais longe de um elemento do conjunto.

Ainda por cima, vamos ajustar essa quantidade por uma taxa de aprendizado (learning rate). Por exemplo, uma taxa de aprendizado de 0.3 quer dizer que os vetores apenas são empurrados por 30% do erro ou diferença entre eles e a instância.

E essa taxa de aprendizado será ajustada também. Queremos que ela tenha o máximo efeito na primeira época, e vá decaindo conforme o treinamento continue, até que seu efeito seja minimizado nas últimas épocas. Fazemos isso porque, como os vetores são inicializados aleatoriamente, provavelmente eles estão muito longe do estado ótimo ou ideal. Mas conforme vamos treinando, eles estão ficando cada vez melhores, então não queremos que eles mudem rápido demais, pra evitar de deixar passar o estado ideal.

Podemos expressar essa taxa de aprendizado assim:

In [None]:
taxa = taxa_aprendizado * (1.0 - (epoca/epocas_total))

E podemos testar essa equação colocando uma taxa de aprendizado de 0.3 e 10 épocas:

Epoca	Taxa de aprendizado
0		0.3
1		0.27
2		0.24
3		0.21
4		0.18
5		0.15
6		0.12
7		0.09
8		0.06
9		0.03

Agora, podemos juntar tudo isso. Abaixo, a função implementa o procedimento para treinar um conjunto de vetores, dado um conjunto de dados para treino.

Só que essa função recebe três outras informações importantes: quantos vetores criar e treinar, a taxa de aprendizado e por quantas épocas deve durar o treinamento.

In [5]:
def treinar_vetores(dados, qtd_vetores, taxa_aprendizado, epocas):
	vetores = [vetor_aleatorio(dados) for i in range(qtd_vetores)]
	for epoca in range(epocas):
		taxa = taxa_aprendizado * (1.0-(epoca/float(epocas)))
		soma_erros = 0.0
		for linha in dados:
			vetor = pegar_vetor_mais_proximo(vetores, linha)
			for i in range(len(linha)-1):
				erro = linha[i] - vetor[i]
				soma_erros += erro**2
				if vetor[-1] == linha[-1]:
					vetor[i] += taxa * erro
				else:
					vetor[i] -= taxa * erro
		print('>epoca=%d, taxa_aprendizado=%.3f, erro=%.3f' % (epoca, taxa, soma_erros))
	return vetores

Perceba que essa função também mostra os erros em cada época, junto com a taxa de aprendizado. E ela faz uso das outras funções que definimos mais acima, para inicializar os vetores e encontrar o vetor mais próximo.

Vamos demonstrar o uso de tudo o que fizemos até aqui com um conjunto de dados inventado:



In [6]:
from math import sqrt
from random import randrange
from random import seed

# calcular a distancia euclidiana entre dois vetores
def distancia_euclidiana(linha1, linha2):
	distancia = 0.0
	for i in range(len(linha1)-1):
		distancia += (linha1[i] - linha2[i])**2
	return sqrt(distancia)

# localizar o vetor mais próximo
def pegar_vetor_mais_proximo(vetores, instancia_teste):
	distancias = list()
	for vetor in vetores:
		dist = distancia_euclidiana(vetor, instancia_teste)
		distancias.append((vetor, dist))
	distancias.sort(key=lambda tup: tup[1])
	return distancias[0][0]

# criar um vetor aleatório
def vetor_aleatorio(dados):
	qtd_registros = len(dados)
	qtd_colunas = len(dados[0])
	vetor = [dados[randrange(qtd_registros)][i] for i in range(qtd_colunas)]
	return vetor

# treinar um conjunto de vetores
def treinar_vetores(dados, qtd_vetores, taxa_aprendizado, epocas):
	vetores = [vetor_aleatorio(dados) for i in range(qtd_vetores)]
	for epoca in range(epocas):
		taxa = taxa_aprendizado * (1.0-(epoca/float(epocas)))
		soma_erros = 0.0
		for linha in dados:
			vetor = pegar_vetor_mais_proximo(vetores, linha)
			for i in range(len(linha)-1):
				erro = linha[i] - vetor[i]
				soma_erros += erro**2
				if vetor[-1] == linha[-1]:
					vetor[i] += taxa * erro
				else:
					vetor[i] -= taxa * erro
		print('>epoca=%d, taxa_aprendizado=%.3f, erro=%.3f' % (epoca, taxa, soma_erros))
	return vetores

# testar a função que treina
seed(1)
conjunto_dados = [[2.7810836,2.550537003,0],
	[1.465489372,2.362125076,0],
	[3.396561688,4.400293529,0],
	[1.38807019,1.850220317,0],
	[3.06407232,3.005305973,0],
	[7.627531214,2.759262235,1],
	[5.332441248,2.088626775,1],
	[6.922596716,1.77106367,1],
	[8.675418651,-0.242068655,1],
	[7.673756466,3.508563011,1]]
taxa_aprendizado = 0.3
qtd_epocas = 10
qtd_vetores = 2
vetores = treinar_vetores(conjunto_dados, qtd_vetores, taxa_aprendizado, qtd_epocas)
print('Vetores: %s' % vetores)


>epoca=0, taxa_aprendizado=0.300, erro=139.812
>epoca=1, taxa_aprendizado=0.270, erro=47.368
>epoca=2, taxa_aprendizado=0.240, erro=27.535
>epoca=3, taxa_aprendizado=0.210, erro=26.241
>epoca=4, taxa_aprendizado=0.180, erro=25.509
>epoca=5, taxa_aprendizado=0.150, erro=24.778
>epoca=6, taxa_aprendizado=0.120, erro=24.053
>epoca=7, taxa_aprendizado=0.090, erro=23.344
>epoca=8, taxa_aprendizado=0.060, erro=22.653
>epoca=9, taxa_aprendizado=0.030, erro=21.981
Vetores: [[7.3188612290158614, 1.9696349335193466, 1], [2.4304257696446854, 2.8396012380964555, 0]]


Esse exemplo treina um conjunto de 2 vetores ao longo de 10 épocoas com uma taxa de aprendizado de 0.3. Os detalhes são mostrados com o print a cada época e os vetores no final são exibidos.

Note como o erro começa grande no começo, e vai gradualmente caindo.


Finalmente, agora que sabemos como treinar esses vetores para resumir um conjunto de dados, vamos aplicar esse algoritmo em um conjunto de dados real!

### Caso de estudo: Ionosfera

O primeiro passo é carregar o conjunto de dados e converter suas informações para valores numéricos que podemos usar nos cálculos de distância. Para isso, fazemos funções para:
1. carregar os dados de um arquivo csv
2. converter números em string para float
3. converter a coluna de classe para valores inteiros

Em seguida, também vamos fazer funções que irão validar o nosso algoritmo usando [validação cruzada](https://pt.wikipedia.org/wiki/Valida%C3%A7%C3%A3o_cruzada).

Este é o exemplo completo:

In [7]:
from random import seed
from random import randrange
from csv import reader
from math import sqrt

# ler arquivo CSV
def carregar_csv(nome_arquivo):
	conjunto_dados = list()
	with open(nome_arquivo, 'r') as file:
		csv_reader = reader(file)
		for linha in csv_reader:
			if not linha:
				continue
			conjunto_dados.append(linha)
	return conjunto_dados

# converter coluna de strings para coluna de floats
def str_coluna_para_float(conjunto_dados, coluna):
	for linha in conjunto_dados:
		linha[coluna] = float(linha[coluna].strip())

# converter coluna de strings para coluna de inteiros
def str_coluna_para_int(conjunto_dados, coluna):
	classes = [linha[coluna] for linha in conjunto_dados]
	unicas = set(classes)
	resultado = dict()
	for i, valor in enumerate(unicas):
		resultado[valor] = i
	for linha in conjunto_dados:
		linha[coluna] = resultado[linha[coluna]]
	return resultado

# particionar o conjunto de dados em qtd_partes
def cross_validation(conjunto_dados, qtd_partes):
	parte_conjunto = list()
	copia_conjunto = list(conjunto_dados)
	tamanho_parte = int(len(conjunto_dados) / qtd_partes)
	for i in range(qtd_partes):
		parte = list()
		while len(parte) < tamanho_parte:
			indice = randrange(len(copia_conjunto))
			parte.append(copia_conjunto.pop(indice))
		parte_conjunto.append(parte)
	return parte_conjunto

# Calcular a eficacia em percentual
def medida_eficacia(real, predicao):
	corretos = 0
	for i in range(len(real)):
		if real[i] == predicao[i]:
			corretos += 1
	return corretos / float(len(real)) * 100.0

# Avaliar um algoritmo usando o cross-validation
def avaliar_algoritmo(conjunto_dados, algoritmo, qtd_partes, *args):
	partes = cross_validation(conjunto_dados, qtd_partes)
	acertos = list()
	for parte in partes:
		conjunto_treino = list(partes)
		conjunto_treino.remove(parte)
		conjunto_treino = sum(conjunto_treino, [])
		conjunto_teste = list()
		for linha in parte:
			copia_linha = list(linha)
			conjunto_teste.append(copia_linha)
			copia_linha[-1] = None
		predicao = algoritmo(conjunto_treino, conjunto_teste, *args)
		real = [linha[-1] for linha in parte]
		eficacia = medida_eficacia(real, predicao)
		acertos.append(eficacia)
	return acertos

# calcular a distancia euclidiana entre dois vetores
def distancia_euclidiana(linha1, linha2):
	distancia = 0.0
	for i in range(len(linha1)-1):
		distancia += (linha1[i] - linha2[i])**2
	return sqrt(distancia)

# localizar o vetor mais próximo
def pegar_vetor_mais_proximo(vetores, instancia_teste):
	distancias = list()
	for vetor in vetores:
		dist = distancia_euclidiana(vetor, instancia_teste)
		distancias.append((vetor, dist))
	distancias.sort(key=lambda tup: tup[1])
	return distancias[0][0]

# fazer uma predição dentre os vetores mais próximos
def predizer(vetores, instancia_teste):
	vetor = pegar_vetor_mais_proximo(vetores, instancia_teste)
	return vetor[-1]

# criar um vetor aleatório
def vetor_aleatorio(dados):
	qtd_registros = len(dados)
	qtd_colunas = len(dados[0])
	vetor = [dados[randrange(qtd_registros)][i] for i in range(qtd_colunas)]
	return vetor

# treinar um conjunto de vetores
def treinar_vetores(dados, qtd_vetores, taxa_aprendizado, epocas):
	vetores = [vetor_aleatorio(dados) for i in range(qtd_vetores)]
	for epoca in range(epocas):
		taxa = taxa_aprendizado * (1.0-(epoca/float(epocas)))
		soma_erros = 0.0
		for linha in dados:
			vetor = pegar_vetor_mais_proximo(vetores, linha)
			for i in range(len(linha)-1):
				erro = linha[i] - vetor[i]
				soma_erros += erro**2
				if vetor[-1] == linha[-1]:
					vetor[i] += taxa * erro
				else:
					vetor[i] -= taxa * erro
	return vetores

# nosso LVQ numa função
def learning_vector_quantization(dados, teste, qtd_vetores, taxa_aprendizado, epocas):
	vetores = treinar_vetores(dados, qtd_vetores, taxa_aprendizado, epocas)
	predicoes = list()
	for linha in teste:
		output = predizer(vetores, linha)
		predicoes.append(output)
	return(predicoes)

# Testar o LVQ nos dados da ionosfera
seed(1)
# carregar e preparar os dados
nome_arquivo = 'src/ionosfera.csv'
conjunto_dados = carregar_csv(nome_arquivo)
for i in range(len(conjunto_dados[0])-1):
	str_coluna_para_float(conjunto_dados, i)
# converter a coluna de classes para inteiros
str_coluna_para_int(conjunto_dados, len(conjunto_dados[0])-1)
# avaliar o algoritmo
qtd_partes = 5
taxa_aprendizado = 0.3
qtd_epocas = 50
qtd_vetores = 20
acertos = avaliar_algoritmo(conjunto_dados, learning_vector_quantization, qtd_partes, qtd_vetores, taxa_aprendizado, qtd_epocas)
print('Acertos: %s' % acertos)
print('Media de acertos: %.3f%%' % (sum(acertos)/float(len(acertos))))

Acertos: [90.0, 88.57142857142857, 84.28571428571429, 87.14285714285714, 85.71428571428571]
Media de acertos: 87.143%


# Discussão

O LVQ é mais usado como um algoritmo de classificação. Ele suporta problemas de classificação tanto binários (com apenas duas classes), quanto multi-classes.

Contudo, como esse algoritmo trabalha fazendo resumo de um corpo maior de informações, ele também pode ser utilizado para compressão de dados. Por exemplo, se quisermos enviar informações através de uma linha telefônica nós devemos:
1. "aprender" os vetores ou protótipos
1. enviar os vetores resultantes do treinamento, em ordem
1. para cada "pedaço" da informação a ser transmitida, nós mandamos somente o índice do vetor que é mais similar (ao invés da informação em si)

Para uma grande quantidade de dados, isso pode ser uma redução significante. Por exemplo, se treinarmos um conjunto de 64 vetores, então só são necessários 6 bits para representar (indexar) esses vetores. E, se os nossos dados forem números em notação de ponto flutuante (floats, que "pesam" 4 bytes), então teríamos uma redução de 80%  ( 100*(1 - 6/32) ) em cada .

E isso pode ser aplicado em diferentes tipos de dados: números (floats), imagens, sinal sonoro...

![i](src/imgcompressao.png)
*Fonte: LIU, Ruochen et al. A new two-step learning vector quantization algorithm for image compression*

Na imagem acima, verificamos um exemplo onde o LVQ foi utilizado para realizar a compressão de uma imagem (a original na esquerda). No trabalho referenciado, foram utilizados 128 vetores treinados para resumir as diferentes possibilidades de cores encontradas na imagem. 

Dando um zoom, podemos notar as perdas de informação decorrentes desse processo:

![i](src/imgcompressaozoom.png)
*Fonte: LIU, Ruochen et al. A new two-step learning vector quantization algorithm for image compression*