<a href="https://colab.research.google.com/github/FGalvao77/Hands-on-with-PySpark/blob/main/01_%5BCLASSIFICATION%5D_Machine_Learning_with_PySpark_Example_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Machine Learning with PySpark: Example 1**
---
---

In [1]:
# instalando a biblioteca "pyspark" no ambiente
%%capture
%pip install pyspark -U

In [2]:
#importando biblioteca
import pyspark

In [3]:
# visualizando a versão da biblioteca
print(f'[VERSION] PySpark: {pyspark.__version__}')

[VERSION] PySpark: 3.3.2


In [4]:
# importando o módulo "SparkSession" para criar uma sessão spark
from pyspark.sql import SparkSession

In [5]:
# instanciando uma sessão para aplicação do recursos do "pyspark"
spark_session = SparkSession\
    .builder\
    .master('local[*]')\
    .appName(name='Machine Learning with Pypark: Example 1')\
    .getOrCreate()

In [6]:
# visualizando a sessão criada
spark_session

In [7]:
# importando as bibliotecas
import pandas as pd     # análise e manipulação de dados
import warnings         # ignorar mensagens de alerta

warnings.filterwarnings('ignore')

In [8]:
# utilizando uma url para carregar o conjunto de dados no objeto "df"
df = pd.read_csv('https://query.data.world/s/6na4pihcmd7lg24xiveadvsshzvhr3?dws=00000')

# utilizando a função ".to_csv()" para salvar o conteúdo do objeto em arquivo 
# do tipo ".csv" com o nome de "data"
df.to_csv('data.csv')

In [9]:
# utilizando o comando "!head" para visualizar o conteúdo do arquivo "data.csv"
!head data.csv

,sepal_length,sepal_width,petal_length,petal_width,species
0,5.1,3.5,1.4,0.2,setosa
1,4.9,3.0,1.4,0.2,setosa
2,4.7,3.2,1.3,0.2,setosa
3,4.6,3.1,1.5,0.2,setosa
4,5.0,3.6,1.4,0.2,setosa
5,5.4,3.9,1.7,0.4,setosa
6,4.6,3.4,1.4,0.3,setosa
7,5.0,3.4,1.5,0.2,setosa
8,4.4,2.9,1.4,0.2,setosa


In [10]:
# com a sessão spark, utilizaremos a função "read.csv()" para realizar a leitura
# e o carregamento do conteúdo do obejto "data.csv" no objeto "iris"
iris = spark_session.read.csv(
    path='data.csv',        # caminho onde o objeto se encontra
    header=True,            # cabeçalho do arquivo
    inferSchema=True,       # esquema do tipo de dados
    sep=','                 # cada instância/observação está separada por vírgula (,)
)

In [11]:
# visualizando o tipo do objeto
type(iris)

pyspark.sql.dataframe.DataFrame

In [12]:
# visualizando o tipo de dados em cada atributo/coluna
iris.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- sepal_length: double (nullable = true)
 |-- sepal_width: double (nullable = true)
 |-- petal_length: double (nullable = true)
 |-- petal_width: double (nullable = true)
 |-- species: string (nullable = true)



In [13]:
# visualizando as primeiras observações do objeto "iris"
iris.show()     # como padrão, temos o retorno das 20 primeiras observações

+---+------------+-----------+------------+-----------+-------+
|_c0|sepal_length|sepal_width|petal_length|petal_width|species|
+---+------------+-----------+------------+-----------+-------+
|  0|         5.1|        3.5|         1.4|        0.2| setosa|
|  1|         4.9|        3.0|         1.4|        0.2| setosa|
|  2|         4.7|        3.2|         1.3|        0.2| setosa|
|  3|         4.6|        3.1|         1.5|        0.2| setosa|
|  4|         5.0|        3.6|         1.4|        0.2| setosa|
|  5|         5.4|        3.9|         1.7|        0.4| setosa|
|  6|         4.6|        3.4|         1.4|        0.3| setosa|
|  7|         5.0|        3.4|         1.5|        0.2| setosa|
|  8|         4.4|        2.9|         1.4|        0.2| setosa|
|  9|         4.9|        3.1|         1.5|        0.1| setosa|
| 10|         5.4|        3.7|         1.5|        0.2| setosa|
| 11|         4.8|        3.4|         1.6|        0.2| setosa|
| 12|         4.8|        3.0|         1

In [14]:
# nome das colunas
iris.columns

['_c0',
 'sepal_length',
 'sepal_width',
 'petal_length',
 'petal_width',
 'species']

In [15]:
# realizando uma seleção das colunas - excluindo a coluna "_c0"
iris = iris.select(
    'sepal_length',
    'sepal_width',
    'petal_length',
    'petal_width',
    'species'
)

In [16]:
# contabilizando a quantidade de observações do atributo alvo em cada instância
iris.groupBy('species').count().sort('species').toPandas()

Unnamed: 0,species,count
0,setosa,50
1,versicolor,50
2,virginica,50


In [17]:
# particionando os dados para treino e validação
(train_data, val_data) = iris.randomSplit(
    weights=[0.70, 0.30],   # 70% para treino e o restante, 30% para validação
    seed=1522               # instanciando a semente aleatória
)

In [18]:
# visualizando as 10 primeiras observações dos dados de treino
train_data.show(n=10)

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         4.3|        3.0|         1.1|        0.1| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.4|        3.0|         1.3|        0.2| setosa|
|         4.4|        3.2|         1.3|        0.2| setosa|
|         4.5|        2.3|         1.3|        0.3| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         4.6|        3.6|         1.0|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.8|        3.0|         1.4|        0.1| setosa|
|         4.8|        3.0|         1.4|        0.3| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 10 rows



In [19]:
# visualizando as 10 primeiras observações dos dados de validação
val_data.show(n=10)

+------------+-----------+------------+-----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|
+------------+-----------+------------+-----------+----------+
|         4.6|        3.1|         1.5|        0.2|    setosa|
|         4.6|        3.2|         1.4|        0.2|    setosa|
|         4.7|        3.2|         1.6|        0.2|    setosa|
|         4.8|        3.4|         1.9|        0.2|    setosa|
|         4.9|        2.5|         4.5|        1.7| virginica|
|         4.9|        3.1|         1.5|        0.1|    setosa|
|         4.9|        3.1|         1.5|        0.1|    setosa|
|         5.0|        2.0|         3.5|        1.0|versicolor|
|         5.0|        3.6|         1.4|        0.2|    setosa|
|         5.1|        2.5|         3.0|        1.1|versicolor|
+------------+-----------+------------+-----------+----------+
only showing top 10 rows



In [20]:
# importando as funções "VectorAssembler" e "StringIndexer" 
# para realizar o tratamento dos dados
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [21]:
# instanciando o objeto "vector" com a função "VectorAssembler", tem objetivo  
# de criar um vetor com os dados de entrada - variáveis independentes
vector = VectorAssembler(
    inputCols=['sepal_length', 'sepal_width',   # dados de entrada 
               'petal_length','petal_width'],
    outputCol='independent_variables'           # dados de saída - vetor de saída
)

In [22]:
# instanciando o objeto "indexer" com a função "StringIndexer", tem objetivo
# de realizar a transformação dos dados categóricos em numéricos
indexer = StringIndexer(
    inputCol='species',             # dados de entrada da variável alvo
    outputCol='dependent_variable'  # dados de saída após o tratamento 
)

In [23]:
# importando a função para aplicação do modelo de classificação
# de uma rede neural multicamadas do tipo perceptron
from pyspark.ml.classification import MultilayerPerceptronClassifier

In [24]:
# instanciando o modelo e definindo os seus parâmetros
mlp = MultilayerPerceptronClassifier(
    featuresCol='independent_variables',    # vetor - variávies independentes 
    labelCol='dependent_variable',          # alvo - variável dependente 
    maxIter=100,                            # quantidade de iterações
    seed=1522,                              # semente aleatória
    layers=[4,5,4,3]                        # camadas da rede
)

In [25]:
# importando a função "Pipeline" para realizar ciclo de aplicação dos objetos
from pyspark.ml import Pipeline

In [26]:
# instanciando o objeto "pipeline" e definindo os seus estágios
# vetor: dados de entrada (variáveis independentes)
# indexer: dados do alvo (variável dependente)
#  mlp: modelo definido 
pipeline = Pipeline(
    stages=[vector, indexer, mlp]
)

In [27]:
# realizando o treinamento do objeto "pipeline" com dos dados de treino
model = pipeline.fit(train_data)

In [28]:
# realizando a predição com o modelo nos dados de validação
pred = model.transform(val_data)

In [29]:
# visualizando as predições realizadas
pred.show()

+------------+-----------+------------+-----------+----------+---------------------+------------------+--------------------+--------------------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|independent_variables|dependent_variable|       rawPrediction|         probability|prediction|
+------------+-----------+------------+-----------+----------+---------------------+------------------+--------------------+--------------------+----------+
|         4.6|        3.1|         1.5|        0.2|    setosa|    [4.6,3.1,1.5,0.2]|               0.0|[11.3664051224641...|[0.99993676886739...|       0.0|
|         4.6|        3.2|         1.4|        0.2|    setosa|    [4.6,3.2,1.4,0.2]|               0.0|[11.9418699485559...|[0.99997249196655...|       0.0|
|         4.7|        3.2|         1.6|        0.2|    setosa|    [4.7,3.2,1.6,0.2]|               0.0|[11.3328332879374...|[0.99993362290403...|       0.0|
|         4.8|        3.4|         1.9|        0.2|    set

In [30]:
# realizando uma seleção das observações de maior interesse 
# variáveis independentes, variável dependente, predição crua, probabilidade  e predição final
pred.select('independent_variables', 'dependent_variable',
            'rawPrediction', 'probability', 'prediction').show()

+---------------------+------------------+--------------------+--------------------+----------+
|independent_variables|dependent_variable|       rawPrediction|         probability|prediction|
+---------------------+------------------+--------------------+--------------------+----------+
|    [4.6,3.1,1.5,0.2]|               0.0|[11.3664051224641...|[0.99993676886739...|       0.0|
|    [4.6,3.2,1.4,0.2]|               0.0|[11.9418699485559...|[0.99997249196655...|       0.0|
|    [4.7,3.2,1.6,0.2]|               0.0|[11.3328332879374...|[0.99993362290403...|       0.0|
|    [4.8,3.4,1.9,0.2]|               0.0|[10.5020792081146...|[0.99977930026643...|       0.0|
|    [4.9,2.5,4.5,1.7]|               2.0|[-70.366748163505...|[5.69099099464811...|       2.0|
|    [4.9,3.1,1.5,0.1]|               0.0|[12.6336611675073...|[0.99998988635083...|       0.0|
|    [4.9,3.1,1.5,0.1]|               0.0|[12.6336611675073...|[0.99998988635083...|       0.0|
|    [5.0,2.0,3.5,1.0]|               1.

In [31]:
# importando a função para realizar avaliação do modelo
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [32]:
# instanciando o objeto avaliador e definindo os parâmetros mínimos necessários
eval = MulticlassClassificationEvaluator(
    predictionCol='prediction',         # coluna da predição final realizada
    labelCol='dependent_variable',      # coluna do atributo alvo - valor real
    metricName='accuracy'               # métrica de avaliação
)

In [33]:
# aplicando o objeto avaliador "eval" nas predições realizadas pelo modelo
# e salvando o resultado no objeto "acc"
acc = eval.evaluate(pred)

In [34]:
# visualizando o resultado final da acurácia do modelo
print('\t[ACCURACY]')
print('=' * 30)
print(f'MLP: {(acc * 100):.2f}%')

	[ACCURACY]
MLP: 100.00%


In [35]:
# visualizando as variávies do ambiente
%whos

Variable                            Type                                 Data/Info
----------------------------------------------------------------------------------
MulticlassClassificationEvaluator   ABCMeta                              <class 'pyspark.ml.evalua<...>ClassificationEvaluator'>
MultilayerPerceptronClassifier      ABCMeta                              <class 'pyspark.ml.classi<...>yerPerceptronClassifier'>
Pipeline                            ABCMeta                              <class 'pyspark.ml.pipeline.Pipeline'>
SparkSession                        type                                 <class 'pyspark.sql.session.SparkSession'>
StringIndexer                       ABCMeta                              <class 'pyspark.ml.feature.StringIndexer'>
VectorAssembler                     ABCMeta                              <class 'pyspark.ml.feature.VectorAssembler'>
acc                                 float                                1.0
df                                  D

In [36]:
# listando as variáveis do ambiente em forma de lista
%who_ls

['MulticlassClassificationEvaluator',
 'MultilayerPerceptronClassifier',
 'Pipeline',
 'SparkSession',
 'StringIndexer',
 'VectorAssembler',
 'acc',
 'df',
 'eval',
 'indexer',
 'iris',
 'mlp',
 'model',
 'pd',
 'pipeline',
 'pred',
 'pyspark',
 'spark_session',
 'train_data',
 'val_data',
 'vector',