<a href="https://colab.research.google.com/github/evanzs/curso-spark/blob/master/pyspark_pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:

spark_home ='/content/drive/MyDrive/Colab Notebooks/spark-3.5.1-bin-hadoop3.tgz'

# instalar as dependências
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

!tar xf '/content/drive/MyDrive/Colab Notebooks/spark-3.5.1-bin-hadoop3.tgz'
!pip install -q findspark

# configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

# tornar o pyspark "importável"
import findspark
findspark.init('spark-3.5.1-bin-hadoop3')

from pyspark.sql import SparkSession

In [3]:

## Criando RDD
spark = SparkSession.builder.appName("ml").getOrCreate()

## importando classes de ml
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import RFormula
from pyspark.ml.evaluation import BinaryClassificationEvaluator

## importando Dados
import pandas as pd
df  = pd.read_csv("https://raw.githubusercontent.com/evanzs/curso-spark/master/datasets/Churn.csv",sep=';')
df_chorn = spark.createDataFrame(df)
df_chorn.show(5)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+
|        619|   France|Female| 42|     2|       0|            1|        1|             1|       10134888|     1|
|        608|    Spain|Female| 41|     1| 8380786|            1|        0|             1|       11254258|     0|
|        502|   France|Female| 42|     8| 1596608|            3|        1|             0|       11393157|     1|
|        699|   France|Female| 39|     1|       0|            2|        0|             0|        9382663|     0|
|        850|    Spain|Female| 43|     2|12551082|            1|        1|             1|         790841|     0|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+-------

In [4]:
## instanciando e configurando
# o metodo já faz varios tipos de transformação
formula = RFormula(formula="Exited ~ .",featuresCol="features",labelCol="label",handleInvalid="skip")


# aplicando a formula e trienando
model_churn = formula.fit(df_chorn).transform(df_chorn)

## dividindo o dataFrame
treino,teste = model_churn.randomSplit([0.7,0.3])

#criando o modelo
dt_chorn = DecisionTreeClassifier(labelCol="label", featuresCol="features")

# treinando o modelo
model_dt = dt_chorn.fit(treino)


model_preview = model_dt.transform(teste)

model_preview.show(5)

+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+--------------------+-----+-------------+--------------------+----------+
|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|            features|label|rawPrediction|         probability|prediction|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+--------------------+-----+-------------+--------------------+----------+
|        350|    Spain|  Male| 54|     1|15267748|            1|        1|             1|       19197349|     1|[350.0,0.0,0.0,1....|  1.0|[311.0,115.0]|[0.73004694835680...|       0.0|
|        358|    Spain|Female| 52|     8|14354236|            3|        1|             0|       14195911|     1|[358.0,0.0,0.0,0....|  1.0|   [0.0,35.0]|           [0.0,1.0]|       1.0|
|        359|   France|Female| 44|     6|12874769|            1|      

## **Criando e Trabalhando com PIPELINE**

- criando pipeline apenas nos estagios de transformação (formula) e modelo (dt_chorn)

In [9]:
from pyspark.ml import Pipeline


## passando a transformação e o modelo que foi criado
pipeline = Pipeline(stages =[formula,dt_chorn])

## executando os estagios passando o dataset informado
pipelineModel = pipeline.fit(df_chorn)
## fazendo a previsão
preview = pipelineModel.transform(df_chorn)
preview.show(5)


+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+--------------------+-----+--------------+--------------------+----------+
|CreditScore|Geography|Gender|Age|Tenure| Balance|NumOfProducts|HasCrCard|IsActiveMember|EstimatedSalary|Exited|            features|label| rawPrediction|         probability|prediction|
+-----------+---------+------+---+------+--------+-------------+---------+--------------+---------------+------+--------------------+-----+--------------+--------------------+----------+
|        619|   France|Female| 42|     2|       0|            1|        1|             1|       10134888|     1|[619.0,1.0,0.0,0....|  1.0|[6233.0,713.0]|[0.89735099337748...|       0.0|
|        608|    Spain|Female| 41|     1| 8380786|            1|        0|             1|       11254258|     0|[608.0,0.0,0.0,0....|  0.0|[6233.0,713.0]|[0.89735099337748...|       0.0|
|        502|   France|Female| 42|     8| 1596608|            3| 