In [142]:
#instalando o spark
!pip install pyspark



In [143]:
#criando o objeto sparkSession
from pyspark.sql import SparkSession

#Spark

spark = SparkSession \
.builder \
.appName("Aula 5.2 - SparkML - Pipelines")\
.getOrCreate()

spark.version

'3.4.1'

In [211]:
income_df = spark.read.csv("train.csv", header = 'True', inferSchema = 'True')
income_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: double (nullable = true)
 |-- education: string (nullable = true)
 |-- educational-num: double (nullable = true)
 |-- marital-status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- capital-gain: double (nullable = true)
 |-- capital-loss: double (nullable = true)
 |-- hours-per-week: double (nullable = true)
 |-- native-country: string (nullable = true)
 |-- income: string (nullable = true)



In [212]:
income_df.show(5)

+---+----------+--------+-------------+---------------+-------------------+-----------------+--------------+------+------+------------+------------+--------------+--------------+-------+
|age| workclass|  fnlwgt|    education|educational-num|     marital-status|       occupation|  relationship|  race|gender|capital-gain|capital-loss|hours-per-week|native-country| income|
+---+----------+--------+-------------+---------------+-------------------+-----------------+--------------+------+------+------------+------------+--------------+--------------+-------+
| 67|   Private|366425.0|    Doctorate|           16.0|           Divorced|  Exec-managerial| Not-in-family| White|  Male|     99999.0|         0.0|          60.0| United-States| =>50k.|
| 17|   Private|244602.0|         12th|            8.0|      Never-married|    Other-service|     Own-child| White|  Male|         0.0|         0.0|          15.0| United-States|  <50k.|
| 31|   Private|174201.0|    Bachelors|           13.0| Married-c

In [213]:
income_df.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             43957|
|   mean| 38.61714857701845|
| stddev|13.734400969233446|
|    min|                17|
|    max|                90|
+-------+------------------+



In [214]:
income_df.select('hours-per-week').summary().show()

+-------+-----------------+
|summary|   hours-per-week|
+-------+-----------------+
|  count|            43957|
|   mean|40.40769388265805|
| stddev|12.40030286325265|
|    min|              1.0|
|    25%|             40.0|
|    50%|             40.0|
|    75%|             45.0|
|    max|             99.0|
+-------+-----------------+



In [215]:
income_df.select('marital-status').summary().show( )

+-------+--------------+
|summary|marital-status|
+-------+--------------+
|  count|         43957|
|   mean|          null|
| stddev|          null|
|    min|      Divorced|
|    25%|          null|
|    50%|          null|
|    75%|          null|
|    max|       Widowed|
+-------+--------------+



In [216]:
income_df.groupBy('gender').count().sort("count", ascending = False).show()

+-------+-----+
| gender|count|
+-------+-----+
|   Male|29400|
| Female|14557|
+-------+-----+



In [217]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [218]:
categoricalCols = [ 'workclass' , 'education','marital-status', 'occupation', 'relationship', 'race', 'gender']


categoricalCols

['workclass',
 'education',
 'marital-status',
 'occupation',
 'relationship',
 'race',
 'gender']

In [219]:
#cria os estimadores (que vão implementar o fit()) retornam funções que vão ser aplicadas para transformar o dataset

stringIndexer = StringIndexer(inputCols = categoricalCols, outputCols = [x  + "Index" for x in categoricalCols])
oneHotEncoder = OneHotEncoder(inputCols = stringIndexer.getOutputCols(), outputCols = [x + 'OHE' for x in categoricalCols])

# A categoria que queremos prever tem dois valores str que são "<=50k", e ">50k"
# vamos usar o estimador 'StringIndexer" para convertê-lo em um valor numérico.
labelToIndex = StringIndexer(inputCol = 'income', outputCol = 'income_label')

In [220]:
from pyspark.ml.feature import VectorAssembler

numericCols = ['age', 'fnlwgt', 'educational-num', 'capital-gain', 'capital-loss', 'hours-per-week']

allCols = [c + 'OHE' for c in categoricalCols] + numericCols


# VectoAssmbler é um tranformer (função)
# Transforma um dataframe com colunas em um vetor com colunas

vecAssembler = VectorAssembler(inputCols = allCols, outputCol = "features")

In [221]:
from pyspark.ml.classification import DecisionTreeClassifier
dtc = DecisionTreeClassifier(labelCol= 'income_label', featuresCol= "features")

In [222]:
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml import Pipeline

#um pipeline é uma sequência de estágios.
#vai estimar o resultado


pipeline = Pipeline(stages = [stringIndexer, oneHotEncoder, labelToIndex, vecAssembler, dtc])

#separa os dados em dados de teste e dados de treinamento
train_data, test_data = income_df.randomSplit([0.7,0.3])

#Pipeline é um estimador que vai receber os dados de treinamento
pipelineModel = pipeline.fit(train_data)

#Aplica o modelo do pipeline aos dados do teste
predictionsDF = pipelineModel.transform(test_data)


In [223]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#acuracia: % de previsões e corretas
evaluator = MulticlassClassificationEvaluator(metricName = 'accuracy', labelCol = 'income_label')
print(f"Acurácia: {evaluator.evaluate(predictionsDF)}")

Acurácia: 0.8452514383919898


Quero repetir o teste 100 vezes para analisar os resultados

In [224]:
resultados = []
for i in range(100):
  pipeline = Pipeline(stages = [stringIndexer, oneHotEncoder, labelToIndex, vecAssembler, dtc])
  train_data, test_data = income_df.randomSplit([0.7,0.3])
  pipelineModel = pipeline.fit(train_data)
  predictionsDF = pipelineModel.transform(test_data)
  resultados.append(evaluator.evaluate(predictionsDF))



In [225]:
import pandas as pd
import numpy as np

df = pd.DataFrame(np.array(resultados))
a, b = df.mean(), df.std()
print(f"a média de 100 testes foi {a[0]:.2f}, e o desvio padrão  foi {b[0]:.5f}")

a média de 100 testes foi 0.84, e o desvio padrão  foi 0.00403
