# Aula 5.2 - SparkML e Pipelines

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Aula 5.2 - SparkML e Pipelines") \
        .getOrCreate()
    
spark.version

'3.2.1'

In [4]:
# Lendo o arquivo com os dados socioeconômicos
income_df = spark.read.csv("/home/jovyan/work/income-dataset.csv", header=True, inferSchema=True)
income_df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- workclass: string (nullable = true)
 |-- fnlwgt: integer (nullable = true)
 |-- education: string (nullable = true)
 |-- education_num: integer (nullable = true)
 |-- marital_status: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- relationship: string (nullable = true)
 |-- race: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- capital_gain: integer (nullable = true)
 |-- capital_loss: integer (nullable = true)
 |-- hours_per_week: integer (nullable = true)
 |-- native_country: string (nullable = true)
 |-- income: string (nullable = true)



In [5]:
income_df.show(5)

+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-i

In [6]:
income_df.describe('age').show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|             48842|
|   mean| 38.64358543876172|
| stddev|13.710509934443603|
|    min|                17|
|    max|                90|
+-------+------------------+



In [7]:
income_df.select('hours_per_week').summary().show()

+-------+------------------+
|summary|    hours_per_week|
+-------+------------------+
|  count|             48842|
|   mean|40.422382375824085|
| stddev|12.391444024252301|
|    min|                 1|
|    25%|                40|
|    50%|                40|
|    75%|                45|
|    max|                99|
+-------+------------------+



In [8]:
income_df.select('marital_status').summary().show()

+-------+--------------+
|summary|marital_status|
+-------+--------------+
|  count|         48842|
|   mean|          null|
| stddev|          null|
|    min|      Divorced|
|    25%|          null|
|    50%|          null|
|    75%|          null|
|    max|       Widowed|
+-------+--------------+



In [9]:
income_df.groupBy("sex").count().sort("count", ascending=False).show()

+------+-----+
|   sex|count|
+------+-----+
|  Male|32650|
|Female|16192|
+------+-----+



In [25]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# Identifica as colunas categoricas
categoricalCols = ["workclass", "education", "marital_status", "occupation", "relationship", "race", "sex"]

# Cria estimadores (que implementam fit()) que retornam funções que vão ser aplicadas para transformar o dataset
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
# Divide cada dado da categoria em uma coluna e atribui 1 quando for sua ocorrência para o registro
oneHotEncoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])

# A categoria que queremos prever tem dpis valores que são strings, '<=50K' e '>50K'
# Usamos o estimador 'StringIndexer' para convertê-lo em um valor númerico
labelToIndex = StringIndexer(inputCol="income", outputCol="income_label")

In [26]:
from pyspark.ml.feature import VectorAssembler

# Colunas númericas
numericCols = ["age", "fnlwgt", "education_num", "capital_gain", "capital_loss", "hours_per_week"]

# Todas as colunas
allCols = [c + "OHE" for c in categoricalCols] + numericCols

# VectorAsssembler é um transformer
# Transforma um dataframe com colunas e um vetor com colunas
# [age | hours_per_week | education_num] => [20, 40, 13]
vecAssembler = VectorAssembler(inputCols=allCols, outputCol="features")

In [27]:
from pyspark.ml.classification import DecisionTreeClassifier
# Instanciar o modelo de aprendizado de máquina Árvore de Decisão
dtc = DecisionTreeClassifier(labelCol='income_label',featuresCol='features')

In [29]:
from pyspark.ml import Pipeline

# Um pipeline é uma sequencia de estágios
# É um estimador
pipeline = Pipeline(stages=[stringIndexer, oneHotEncoder, labelToIndex, vecAssembler, dtc])

# Separa os dados em dados de treinamento e teste
train_data,test_data = income_df.randomSplit([0.7,0.3])

# Pipeline é um estimador que recebe um dataframe e produz um Model
pipelineModel = pipeline.fit(train_data)

# Aplica o modelo do pipeline aos dados de teste
predictionsDF = pipelineModel.transform(test_data)

In [30]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# acurácia: % de previsões corretas
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='income_label')
print(f"Acurácia: {evaluator.evaluate(predictionsDF)}")

Acurácia: 0.8438628773938527
