## Introdução ao K-means
[link text](https://pt.wikipedia.org/wiki/K-means)
![picture](https://drive.google.com/uc?export=view&id=160HEkR11QhXN8id6pJLIvN-6lKQiUwZo)

## K-Means Versão spark
  Neste exemplo consideramos que geramos 1000 pontos randomicos, no intervalo
  [1,5000] e queremos clusterizar em k grupos.

Iniciamos instalando o pyspark

In [None]:
pip install pyspark


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m5.5 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=3e052692385662b9be59a8637b88da0bcd184851fd1aa2dfe8c4b4eeec3a922f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

## Importamos pyspark - API python para Spark
##            radom - para geração pesudo-aleatório de inteiros

In [None]:
import pyspark as ps
import random

# Importamos o SparkContext para definição do espaço RDD
- Deve existir apenas um SparkContext por job spark
- Note que estamos usando a API RDD.. Mais adiante, usaremos Dataframe

In [None]:
from pyspark.context import SparkContext
sc= SparkContext(appName="K-means", master="local[4]")

## Iniciamos um acumulador - Fica armazenado no Driver e pode ser acumulado
## a partir dos nós trabalhadores

In [None]:
# creates and initializes an accumulator to zero
acum=sc.accumulator(0)

## Criamos uma classe para iniciar as duas listas:

*   centroids - conjunto de centroids dos clustes. Inicialmente vazia
*   points - lista de pontos a clusterizar


  -

In [None]:
class K_means_exemplo_MR:
  def __init__(self, k):
    self.centroids=random.sample(range(1,1000),k)
    self.points=sc.parallelize(random.sample(range(1,5000),1000),2)



## Função findCluster
  input= p-> ponto sendo avaliado; centroids-> lista de centroids
  Objetivo:
    c=varrer a lista e encontrar o centroid mais proximo ao ponto "p"

In [None]:
def findCluster(p,centroids):
    min=999999
    for c in centroids:
      newdist=abs(p-c)
      if  newdist < min:
        centroid=c
        min = newdist
    return [centroid,p]

## Função K_Means - função principal
  obtém a lista de centroids  randômicos - iniciais (centroids_updt)
  inicializa a lista de centroids
  realiza um loop até que o processo convirja e centroids == centroids_updt
     a função map chama a findCluster-> forma o par (centroid, ponto)
     utiliza a função groupByKey para gerar (K,[V])
     calcula a media com a soma/|V| dos valores em V
     transforma a tupla em  RDD com centroids
     incrementa no acumulador o numero de iterações

In [None]:
def k_means(k):

  k_means = K_means_exemplo_MR(k)
  centroids = list([])
  centroids_updt = k_means.centroids
  print("centroids"+str((centroids)))
  print("centroid_upd"+str(centroids_updt))
  points=k_means.points
  print(str(points.count()))
  iter=0
  while (sorted(centroids) != sorted(centroids_updt)):
    centroids=centroids_updt
    keysValues=points.map(lambda j:findCluster(j,centroids))
    #MapValues- Passa cada valor do par (key, Values) a função, sem mudar a chave
    reducedKeys=keysValues.groupByKey().mapValues(lambda x: sum(x) / len(x))
    newKeys=reducedKeys.map(lambda x:x[1])
    centroids_updt=newKeys.collect()
    acum.add(1)

  return centroids_updt

## Invoca o process

In [None]:
k=3
groups = k_means(k)
print(str(groups)+ "Number of iterations: "+str(acum))

centroids[]
centroid_upd[784, 698, 849]
1000
[2514.5324675324673, 4165.158959537573, 839.9190751445087]Number of iterations: 33
