# Spark Machine Learning
## Exemplo com PCA no PySpark.

Para o exemplo de utilização das Bibliotecas de Machine Learning com o Spark, ciramos um modelo com redução de dimensionalidade através da Análise de Componentes Principais (PCA). 

In [1]:
# Carrega bibliotecas base e inicia sessão:
import pyspark
from pyspark.sql import SparkSession
ss = SparkSession.builder.master("local[1]") \
                    .appName('MLModels') \
                    .getOrCreate()
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

import pyspark.sql.functions as f
import pyspark.sql.types

In [15]:
# Carrega bibliotecas para tratamento de dados
# e Machine Learning:
from pyspark.ml.feature import PCA # Biblioteca para PCA. 
from pyspark.ml.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
import numpy as np
import pandas as pd

In [8]:
# Inicia leitura de dados e criação
# de DataFrames usando Pandas:
cereal = pd.read_csv("/home/centos/spark-notebooks/Aula/PCA.csv", sep = ';', header=0)
cereal.sample()

Unnamed: 0,name,mfr,calories,protein,fat,sodium,fiber,carbo,sugars,potass,rating
5,Golden_Grahams,G,110,1,1,280,0.0,15.0,9,45,A


In [9]:
# Limpa dataframe para conversão:
cereal = cereal.drop(['mfr', 'rating'], axis = 1) # Remove qualitativas
cereal.sample()

Unnamed: 0,name,calories,protein,fat,sodium,fiber,carbo,sugars,potass
28,Just_Right_Crunchy__Nuggets,110,2,1,170,1.0,17.0,6,60


In [10]:
# Converte pandas DF para Spark DF:
sp_cereal =  ss.createDataFrame(cereal)

In [11]:
# Amostra de dados:
sp_cereal.show(5)

+--------------------+--------+-------+---+------+-----+-----+------+------+
|                name|calories|protein|fat|sodium|fiber|carbo|sugars|potass|
+--------------------+--------+-------+---+------+-----+-----+------+------+
|        Cap'n'Crunch|     120|      1|  2|   220|  0.0| 12.0|    12|    35|
|Cinnamon_Toast_Cr...|     120|      1|  3|   210|  0.0| 13.0|     9|    45|
|    Honey_Graham_Ohs|     120|      1|  2|   220|  1.0| 12.0|    11|    45|
|       Count_Chocula|     110|      1|  1|   180|  0.0| 12.0|    13|    65|
|         Cocoa_Puffs|     110|      1|  1|   180|  0.0| 12.0|    13|    55|
+--------------------+--------+-------+---+------+-----+-----+------+------+
only showing top 5 rows



Para o correto funcionamento da PCA no Spark, as colunas para cada atributo ou 'feature' precisam ser convertidas em vetores separados. Isso pode ser realizado utilizando o método de transformação "VectorAssembler", conforme é demonstrado abaixo. 

In [12]:
# Separa vetores para PCA no Spark
colunas = sp_cereal.drop('name').columns
colunas

['calories', 'protein', 'fat', 'sodium', 'fiber', 'carbo', 'sugars', 'potass']

In [16]:
# Cria objetos SPARK necessários para PCA:
assembler = VectorAssembler(inputCols=colunas, outputCol = 'properties')
output_dat = assembler.transform(sp_cereal).select('name', 'properties')
output_dat.show(5, truncate = False)

+---------------------+----------------------------------------+
|name                 |properties                              |
+---------------------+----------------------------------------+
|Cap'n'Crunch         |[120.0,1.0,2.0,220.0,0.0,12.0,12.0,35.0]|
|Cinnamon_Toast_Crunch|[120.0,1.0,3.0,210.0,0.0,13.0,9.0,45.0] |
|Honey_Graham_Ohs     |[120.0,1.0,2.0,220.0,1.0,12.0,11.0,45.0]|
|Count_Chocula        |[110.0,1.0,1.0,180.0,0.0,12.0,13.0,65.0]|
|Cocoa_Puffs          |[110.0,1.0,1.0,180.0,0.0,12.0,13.0,55.0]|
+---------------------+----------------------------------------+
only showing top 5 rows



Para garantir resultados corretos, os dados no para o modelo PCA no Python/PySpark precisam ser escalados e normalizados. Os processos de centralização e normalização da biblioteca 'pyspark.ml.feature' abaixo servem para isso. 

In [22]:
# Centralizar dados:
scaler = StandardScaler(inputCol="properties", outputCol="scaledProperties", withStd=False, withMean=True)

# Estatisticas para modelo:
scalerModel = scaler.fit(output_dat)

# Normaliza propriedades:
scaledData = scalerModel.transform(output_dat)
scaledData.select(['name', 'scaledProperties']).show(5, truncate = True)

+--------------------+--------------------+
|                name|    scaledProperties|
+--------------------+--------------------+
|        Cap'n'Crunch|[13.1168831168831...|
|Cinnamon_Toast_Cr...|[13.1168831168831...|
|    Honey_Graham_Ohs|[13.1168831168831...|
|       Count_Chocula|[3.11688311688313...|
|         Cocoa_Puffs|[3.11688311688313...|
+--------------------+--------------------+
only showing top 5 rows



Com os dados convertidos no formato adequado ao Spark e devidamente normalizados, podemos iniciar a construção dos componentes principais. 

In [23]:
# Análise de Componentes Principais:
pca = PCA(k=3, inputCol = scaler.getOutputCol(), outputCol="pca_properties")
model = pca.fit(scaledData)

transformed_property = model.transform(scaledData)

Após a construção do modelo, identificamos a variância utilizada para a separação dos componentes. 

In [30]:
np.round(100.00*model.explainedVariance.toArray(),2)

array([56.54, 40.44,  2.79])

Finalmente, podemos listar e classificar as propriedades nutricionais de acordo com suas correlações com componentes identificados.

In [31]:
# Identificação dos Componentes:
componentes = np.round(model.pc.toArray(), 4)
df_pc = pd.DataFrame(componentes, columns = ['PC1', 'PC2', 'PC3'], index = colunas)
df_pc

Unnamed: 0,PC1,PC2,PC3
calories,-0.0741,0.009,0.9864
protein,0.0013,-0.0083,0.0036
fat,0.0002,-0.0027,0.0288
sodium,-0.9919,-0.1026,-0.0734
fiber,0.0043,-0.0299,-0.0291
carbo,-0.0197,0.0185,0.0259
sugars,-0.0057,-0.002,0.1374
potass,0.1012,-0.994,0.0175


Em conclusão, o exemplo acima demonstra as capacidades dos algoritmos de Machine Learning disponíveis no Spark. Podemos observar que suas capacidades são similares aos algoritmos de Machine Learning do Python e R, mas a estrutura dos objetos do Spark (RDD e DataFrames) exigem um tratamento peculiar aos dados antes de submeter aos testes. 