# **Aula 1 - Preparando os Dados**

## **1.1 Apresentação**

## **1.2 Preparando o Ambiente**

### PySpark

PySpark é uma interface para Apache Spark em Python. Ele não apenas permite que você escreva aplicativos Spark usando APIs Python, mas também fornece o *shell* PySpark para analisar interativamente seus dados em um ambiente distribuído. O PySpark oferece suporte à maioria dos recursos do Spark, como Spark SQL, DataFrame, Streaming, MLlib (Machine Learning) e Spark Core.

<center><img src="https://caelum-online-public.s3.amazonaws.com/2273-introducao-spark/01/img-001.png"/></center>

#### **Spark SQL e DataFrame**

Spark SQL é um módulo Spark para processamento de dados estruturados. Ele fornece uma abstração de programação chamada DataFrame e também pode atuar como mecanismo de consulta SQL distribuído.

#### **Spark Streaming**

Executando em cima do Spark, o recurso de *streaming* no Apache Spark possibilita o uso de poderosas aplicações interativas e analíticas em *streaming* e dados históricos, enquanto herda a facilidade de uso do Spark e as características de tolerância a falhas.

#### **Spark MLlib**

Construído sobre o Spark, MLlib é uma biblioteca de aprendizado de máquina escalonável que fornece um conjunto uniforme de APIs de alto nível que ajudam os usuários a criar e ajustar *pipelines* de aprendizado de máquina práticos.

#### **Spark Core**

Spark Core é o mecanismo de execução geral subjacente para a plataforma Spark sobre o qual todas as outras funcionalidades são construídas. Ele fornece um RDD (*Resilient Distributed Dataset*) e recursos de computação na memória.

<font size=2>**Fonte:** [PySpark](https://spark.apache.org/docs/latest/api/python/index.html)</font>

In [1]:
# !pip install pyspark

### SparkSession

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

<font size=2>**Fonte:** [SparkSession](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.html)</font>

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.master('local[*]').appName("Classificação com Spark").getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/17 05:52:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## **1.3 Carregamento dos Dados**

In [4]:
dados = spark.read.csv('dados/dados_clientes.csv', sep=',', header=True, inferSchema=True)

In [5]:
dados

DataFrame[id: int, Churn: string, Mais65anos: int, Conjuge: string, Dependentes: string, MesesDeContrato: int, TelefoneFixo: string, MaisDeUmaLinhaTelefonica: string, Internet: string, SegurancaOnline: string, BackupOnline: string, SeguroDispositivo: string, SuporteTecnico: string, TVaCabo: string, StreamingFilmes: string, TipoContrato: string, ContaCorreio: string, MetodoPagamento: string, MesesCobrados: double]

In [6]:
dados.show()

+---+-----+----------+-------+-----------+---------------+------------+------------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------+------------+----------------+-------------+
| id|Churn|Mais65anos|Conjuge|Dependentes|MesesDeContrato|TelefoneFixo|MaisDeUmaLinhaTelefonica|   Internet|   SegurancaOnline|      BackupOnline| SeguroDispositivo|    SuporteTecnico|           TVaCabo|   StreamingFilmes|TipoContrato|ContaCorreio| MetodoPagamento|MesesCobrados|
+---+-----+----------+-------+-----------+---------------+------------+------------------------+-----------+------------------+------------------+------------------+------------------+------------------+------------------+------------+------------+----------------+-------------+
|  0|  Nao|         0|    Sim|        Nao|              1|         Nao|    SemServicoTelefonico|        DSL|               Nao|               Sim|              

In [7]:
dados.count()

10348

## Target

In [8]:
dados.select('Churn').groupBy('Churn').count().show()

+-----+-----+
|Churn|count|
+-----+-----+
|  Sim| 5174|
|  Nao| 5174|
+-----+-----+



In [9]:
dados.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Churn: string (nullable = true)
 |-- Mais65anos: integer (nullable = true)
 |-- Conjuge: string (nullable = true)
 |-- Dependentes: string (nullable = true)
 |-- MesesDeContrato: integer (nullable = true)
 |-- TelefoneFixo: string (nullable = true)
 |-- MaisDeUmaLinhaTelefonica: string (nullable = true)
 |-- Internet: string (nullable = true)
 |-- SegurancaOnline: string (nullable = true)
 |-- BackupOnline: string (nullable = true)
 |-- SeguroDispositivo: string (nullable = true)
 |-- SuporteTecnico: string (nullable = true)
 |-- TVaCabo: string (nullable = true)
 |-- StreamingFilmes: string (nullable = true)
 |-- TipoContrato: string (nullable = true)
 |-- ContaCorreio: string (nullable = true)
 |-- MetodoPagamento: string (nullable = true)
 |-- MesesCobrados: double (nullable = true)



## **1.4 Transformando os Dados**

<font size=2>**Fonte:** [Functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions)</font>

In [10]:
colunasBinarias = [
    'Churn',
    'Conjuge',
    'Dependentes',
    'TelefoneFixo',
    'MaisDeUmaLinhaTelefonica',
    'SegurancaOnline',
    'BackupOnline',
    'SeguroDispositivo',
    'SuporteTecnico',
    'TVaCabo',
    'StreamingFilmes',
    'ContaCorreio'
]

In [11]:
from pyspark.sql import functions as f

### Preparando os dados

Transformando os dados em 0 e 1 

In [12]:
todasColunas = [f.when(f.col(c)=='Sim', 1).otherwise(0).alias(c) for c in colunasBinarias]
for coluna in reversed(dados.columns):
  if coluna not in colunasBinarias:
    todasColunas.insert(0, coluna)
todasColunas

['id',
 'Mais65anos',
 'MesesDeContrato',
 'Internet',
 'TipoContrato',
 'MetodoPagamento',
 'MesesCobrados',
 Column<'CASE WHEN (Churn = Sim) THEN 1 ELSE 0 END AS Churn'>,
 Column<'CASE WHEN (Conjuge = Sim) THEN 1 ELSE 0 END AS Conjuge'>,
 Column<'CASE WHEN (Dependentes = Sim) THEN 1 ELSE 0 END AS Dependentes'>,
 Column<'CASE WHEN (TelefoneFixo = Sim) THEN 1 ELSE 0 END AS TelefoneFixo'>,
 Column<'CASE WHEN (MaisDeUmaLinhaTelefonica = Sim) THEN 1 ELSE 0 END AS MaisDeUmaLinhaTelefonica'>,
 Column<'CASE WHEN (SegurancaOnline = Sim) THEN 1 ELSE 0 END AS SegurancaOnline'>,
 Column<'CASE WHEN (BackupOnline = Sim) THEN 1 ELSE 0 END AS BackupOnline'>,
 Column<'CASE WHEN (SeguroDispositivo = Sim) THEN 1 ELSE 0 END AS SeguroDispositivo'>,
 Column<'CASE WHEN (SuporteTecnico = Sim) THEN 1 ELSE 0 END AS SuporteTecnico'>,
 Column<'CASE WHEN (TVaCabo = Sim) THEN 1 ELSE 0 END AS TVaCabo'>,
 Column<'CASE WHEN (StreamingFilmes = Sim) THEN 1 ELSE 0 END AS StreamingFilmes'>,
 Column<'CASE WHEN (ContaCorr

In [13]:
dados.select(todasColunas).show()

+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
| id|Mais65anos|MesesDeContrato|   Internet|TipoContrato| MetodoPagamento|MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|
+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
|  0|         0|              1|        DSL| Mensalmente|BoletoEletronico|        29.85|    0|      1|          0|           0|                       0|              0|           1|                0|             0|      0|              0|      

In [14]:
dataset = dados.select(todasColunas)

In [15]:
dataset.show()

+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
| id|Mais65anos|MesesDeContrato|   Internet|TipoContrato| MetodoPagamento|MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|
+---+----------+---------------+-----------+------------+----------------+-------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+
|  0|         0|              1|        DSL| Mensalmente|BoletoEletronico|        29.85|    0|      1|          0|           0|                       0|              0|           1|                0|             0|      0|              0|      

## **1.5 Criando *Dummies***

In [16]:
dataset.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Mais65anos: integer (nullable = true)
 |-- MesesDeContrato: integer (nullable = true)
 |-- Internet: string (nullable = true)
 |-- TipoContrato: string (nullable = true)
 |-- MetodoPagamento: string (nullable = true)
 |-- MesesCobrados: double (nullable = true)
 |-- Churn: integer (nullable = false)
 |-- Conjuge: integer (nullable = false)
 |-- Dependentes: integer (nullable = false)
 |-- TelefoneFixo: integer (nullable = false)
 |-- MaisDeUmaLinhaTelefonica: integer (nullable = false)
 |-- SegurancaOnline: integer (nullable = false)
 |-- BackupOnline: integer (nullable = false)
 |-- SeguroDispositivo: integer (nullable = false)
 |-- SuporteTecnico: integer (nullable = false)
 |-- TVaCabo: integer (nullable = false)
 |-- StreamingFilmes: integer (nullable = false)
 |-- ContaCorreio: integer (nullable = false)



In [17]:
dataset.select(['Internet', 'TipoContrato', 'MetodoPagamento']).show()

+-----------+------------+----------------+
|   Internet|TipoContrato| MetodoPagamento|
+-----------+------------+----------------+
|        DSL| Mensalmente|BoletoEletronico|
|        DSL|       UmAno|          Boleto|
|        DSL| Mensalmente|          Boleto|
|        DSL|       UmAno|   DebitoEmConta|
|FibraOptica| Mensalmente|BoletoEletronico|
|FibraOptica| Mensalmente|BoletoEletronico|
|FibraOptica| Mensalmente|   CartaoCredito|
|        DSL| Mensalmente|          Boleto|
|FibraOptica| Mensalmente|BoletoEletronico|
|        DSL|       UmAno|   DebitoEmConta|
|        DSL| Mensalmente|          Boleto|
|        Nao|    DoisAnos|   CartaoCredito|
|FibraOptica|       UmAno|   CartaoCredito|
|FibraOptica| Mensalmente|   DebitoEmConta|
|FibraOptica| Mensalmente|BoletoEletronico|
|FibraOptica|    DoisAnos|   CartaoCredito|
|        Nao|       UmAno|          Boleto|
|FibraOptica|    DoisAnos|   DebitoEmConta|
|        DSL| Mensalmente|   CartaoCredito|
|FibraOptica| Mensalmente|Boleto

In [18]:
# coloca 1 quando tem o valor e 0 quando não tem 
dataset.groupBy('id').pivot('MetodoPagamento').agg(f.lit(1)).na.fill(0).show()

+-----+------+----------------+-------------+-------------+
|   id|Boleto|BoletoEletronico|CartaoCredito|DebitoEmConta|
+-----+------+----------------+-------------+-------------+
| 3997|     0|               0|            1|            0|
| 7554|     0|               1|            0|            0|
| 6336|     0|               1|            0|            0|
| 6357|     0|               1|            0|            0|
| 9427|     0|               0|            1|            0|
| 2659|     0|               0|            1|            0|
|  471|     0|               1|            0|            0|
| 4935|     0|               0|            1|            0|
| 4818|     0|               0|            1|            0|
| 1342|     1|               0|            0|            0|
| 1959|     0|               1|            0|            0|
| 9376|     0|               0|            1|            0|
| 2366|     0|               1|            0|            0|
| 1580|     0|               0|         

In [19]:
Internet = dataset.groupBy('id').pivot('Internet').agg(f.lit(1)).na.fill(0)
TipoContrato = dataset.groupBy('id').pivot('TipoContrato').agg(f.lit(1)).na.fill(0)
MetodoPagamento = dataset.groupBy('id').pivot('MetodoPagamento').agg(f.lit(1)).na.fill(0)

In [20]:
# juntando os dados - renomeando as colunas - removendo colunas 
dataset\
    .join(Internet, 'id', how='inner')\
    .join(TipoContrato, 'id', how='inner')\
    .join(MetodoPagamento, 'id', how='inner')\
    .select(
        '*',
        f.col('DSL').alias('Internet_DSL'),
        f.col('FibraOptica').alias('Internet_FibraOptica'),
        f.col('Nao').alias('Internet_Nao'),
        f.col('Mensalmente').alias('TipoContrato_Mensalmente'),
        f.col('UmAno').alias('TipoContrato_UmAno'),
        f.col('DoisAnos').alias('TipoContrato_DoisAnos'),
        f.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'),
        f.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'),
        f.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'),
        f.col('Boleto').alias('MetodoPagamento_Boleto')
    ).drop(
        'Internet', 'TipoContrato', 'MetodoPagamento', 'DSL',
        'FibraOptica', 'Nao', 'Mensalmente', 'UmAno', 'DoisAnos',
        'DebitoEmConta', 'CartaoCredito', 'BoletoEletronico', 'Boleto'
    ).show()

24/03/17 05:53:05 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
|  id|Mais65anos|MesesDeContrato|    MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|Internet_DSL|Internet_FibraOptica|Internet_Nao|TipoContrato_Mensalmente|TipoContrato_UmAno|TipoContrato_DoisAnos|MetodoPagamento_DebitoEmConta|MetodoPagamento_CartaoCredito|MetodoPagamento_BoletoEletronico|MetodoPagamento_Boleto|
+----+----------+---------------+-----------------+-----+-------+-----------+------------+----------------------

In [21]:
# salvando a alteração no dataset
dataset = dataset\
    .join(Internet, 'id', how='inner')\
    .join(TipoContrato, 'id', how='inner')\
    .join(MetodoPagamento, 'id', how='inner')\
    .select(
        '*',
        f.col('DSL').alias('Internet_DSL'),
        f.col('FibraOptica').alias('Internet_FibraOptica'),
        f.col('Nao').alias('Internet_Nao'),
        f.col('Mensalmente').alias('TipoContrato_Mensalmente'),
        f.col('UmAno').alias('TipoContrato_UmAno'),
        f.col('DoisAnos').alias('TipoContrato_DoisAnos'),
        f.col('DebitoEmConta').alias('MetodoPagamento_DebitoEmConta'),
        f.col('CartaoCredito').alias('MetodoPagamento_CartaoCredito'),
        f.col('BoletoEletronico').alias('MetodoPagamento_BoletoEletronico'),
        f.col('Boleto').alias('MetodoPagamento_Boleto')
    ).drop(
        'Internet', 'TipoContrato', 'MetodoPagamento', 'DSL',
        'FibraOptica', 'Nao', 'Mensalmente', 'UmAno', 'DoisAnos',
        'DebitoEmConta', 'CartaoCredito', 'BoletoEletronico', 'Boleto'
    )

In [22]:
dataset.show()

+----+----------+---------------+-----------------+-----+-------+-----------+------------+------------------------+---------------+------------+-----------------+--------------+-------+---------------+------------+------------+--------------------+------------+------------------------+------------------+---------------------+-----------------------------+-----------------------------+--------------------------------+----------------------+
|  id|Mais65anos|MesesDeContrato|    MesesCobrados|Churn|Conjuge|Dependentes|TelefoneFixo|MaisDeUmaLinhaTelefonica|SegurancaOnline|BackupOnline|SeguroDispositivo|SuporteTecnico|TVaCabo|StreamingFilmes|ContaCorreio|Internet_DSL|Internet_FibraOptica|Internet_Nao|TipoContrato_Mensalmente|TipoContrato_UmAno|TipoContrato_DoisAnos|MetodoPagamento_DebitoEmConta|MetodoPagamento_CartaoCredito|MetodoPagamento_BoletoEletronico|MetodoPagamento_Boleto|
+----+----------+---------------+-----------------+-----+-------+-----------+------------+----------------------

# Preparando os dados para o modelo

In [23]:
from pyspark.ml.feature import VectorAssembler

In [24]:
dataset = dataset.withColumnRenamed('Churn','label')

In [25]:
x = dataset.columns
x.remove('label')
x.remove('id')

In [26]:
assembler = VectorAssembler(inputCols=x, outputCol='features')
dataset_prep = assembler.transform(dataset).select('features','label')

In [27]:
dataset_prep.show(10, truncate=False)

# 24 quantidade features
# representação do vetor com coluna que não são 0
# valores da lista 

+-----------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                   |label|
+-----------------------------------------------------------------------------------------------------------+-----+
|(24,[1,2,11,12,13,14,17,22],[1.0,45.30540797610398,1.0,1.0,1.0,1.0,1.0,1.0])                               |1    |
|(24,[1,2,3,5,6,8,9,11,12,13,15,17,22],[60.0,103.6142230120257,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|1    |
|(24,[1,2,5,6,10,11,12,13,14,18,23],[12.0,75.85,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                       |0    |
|(24,[1,2,3,5,8,12,13,14,19,21],[69.0,61.45,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                               |0    |
|(24,[1,2,3,5,6,11,13,15,17,22],[7.0,86.5,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                                 |1    |
|(24,[1,2,5,6,12,13,15,17,22],[14.0,85.03742670311915,1.0,1.0,1.0,1.0,1.

In [28]:
seed = 1

# Treino e Teste

In [29]:
treino, teste = dataset_prep.randomSplit([0.7,0.3], seed=seed)

In [30]:
from pyspark.ml.classification import LogisticRegression

In [31]:
lr = LogisticRegression()
modelo_lr = lr.fit(treino)

In [32]:
prev = modelo_lr.transform(teste)

In [33]:
prev.show()

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(24,[0,1,2,3,4,5,...|    0|[2.97380446663811...|[0.95137657107823...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[-0.1167460666440...|[0.47084658837155...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[1.31077163582464...|[0.78764224982611...|       0.0|
|(24,[0,1,2,3,4,5,...|    1|[0.38062820875640...|[0.59402461030785...|       0.0|
|(24,[0,1,2,3,4,5,...|    1|[0.75936952290207...|[0.68121683473884...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[-1.2040472816188...|[0.23075601021309...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[1.72686921048367...|[0.84901151972742...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[4.12184229757157...|[0.98404410394478...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[0.11573553250188...|[0.52890162957129...|       0.0|
|(24,[0,1,2,3,4,

In [34]:
resume = modelo_lr.summary

In [36]:
resume.accuracy

0.7843406593406593

In [37]:
print("Acurácia: %f" % resume.accuracy)
print("Precisão: %f" % resume.precisionByLabel[1])
print("Recall: %f" % resume.recallByLabel[1])
print("F1: %f" % resume.fMeasureByLabel()[1])

Acurácia: 0.784341
Precisão: 0.770602
Recall: 0.813915
F1: 0.791667


In [38]:
prev.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()

1225

In [39]:
tp = prev.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
tn = prev.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
fp = prev.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
fn = prev.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()
print(tp, tn,fp, fn)

1225 1147 412 284


In [40]:
def calcula_mostra_matriz_confusao(df_transform_modelo, normalize=False, percentage=True):
  tp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 1)).count()
  tn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 0)).count()
  fp = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 0) & (f.col('prediction') == 1)).count()
  fn = df_transform_modelo.select('label', 'prediction').where((f.col('label') == 1) & (f.col('prediction') == 0)).count()
  
  valorP = 1
  valorN = 1

  if normalize:
    valorP = tp + fn
    valorN = fp + tn
  
  if percentage and normalize:
    valorP = valorP / 100
    valorN = valorN / 100

  print(' '*20, 'Previsto')
  print(' '*15, 'Churn', ' '*5 ,'Não-Churn')
  print(' '*4, 'Churn', ' '*6, int(tp/valorP), ' '*7, int(fn/valorP))
  print('Real')
  print(' '*4, 'Não-Churn', ' '*2, int(fp/valorN), ' '*7, int(tn/valorN))

In [41]:
calcula_mostra_matriz_confusao(prev, normalize=False)

                     Previsto
                Churn       Não-Churn
     Churn        1225         284
Real
     Não-Churn    412         1147


# Modelo

# Decision Tree Classifier


Colocar um dataframe o resultado

In [42]:
from pyspark.ml.classification import DecisionTreeClassifier

In [44]:
dtc = DecisionTreeClassifier(seed=seed)
dtc_modelo = dtc.fit(treino)

In [46]:
prev_dtc = dtc_modelo.transform(treino)

In [47]:
prev_dtc.show()

+--------------------+-----+--------------+--------------------+----------+
|            features|label| rawPrediction|         probability|prediction|
+--------------------+-----+--------------+--------------------+----------+
|(24,[0,1,2,3,4,5,...|    0|[2062.0,337.0]|[0.85952480200083...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|[2062.0,337.0]|[0.85952480200083...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|    [26.0,3.0]|[0.89655172413793...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|    [26.0,3.0]|[0.89655172413793...|       0.0|
|(24,[0,1,2,3,4,5,...|    0|    [26.0,3.0]|[0.89655172413793...|       0.0|
|(24,[0,1,2,3,4,5,...|    1|[410.0,2123.0]|[0.16186340307935...|       1.0|
|(24,[0,1,2,3,4,5,...|    0| [231.0,206.0]|[0.52860411899313...|       0.0|
|(24,[0,1,2,3,4,5,...|    1|[410.0,2123.0]|[0.16186340307935...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[410.0,2123.0]|[0.16186340307935...|       1.0|
|(24,[0,1,2,3,4,5,...|    0|[410.0,2123.0]|[0.16186340307935...|       1.0|
|(24,[0,1,2,

In [48]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [49]:
evaluator = MulticlassClassificationEvaluator()

In [50]:
evaluator.evaluate(prev_dtc,{evaluator.metricName: "accuracy"})

0.7912087912087912

In [58]:
print(f"Acurácia: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(prev_dtc,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")


Acurácia: 0.7912
Precisão: 0.7968
Recall: 0.7855
F1: 0.7912


In [54]:
val_dtc = dtc_modelo.transform(teste)

In [56]:
evaluator.evaluate(val_dtc,{evaluator.metricName: "accuracy"})

0.7790091264667536

In [59]:
print('Decision Tree Classifier')
print("="*40)
print("Dados de Treino")
print("="*40)
print("Matriz de Confusão")
print("-"*40)
calcula_mostra_matriz_confusao(prev_dtc, normalize=False)
print("-"*40)
print("Métricas")
print("-"*40)
print(f"Acurácia: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(prev_dtc, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(prev_dtc,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")
print("")
print("="*40)
print("Dados de Teste")
print("="*40)
print("Matriz de Confusão")
print("-"*40)
calcula_mostra_matriz_confusao(val_dtc, normalize=False)
print("-"*40)
print("Métricas")
print("-"*40)
print(f"Acurácia: {round(evaluator.evaluate(val_dtc, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(val_dtc, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(val_dtc, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(val_dtc,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")

Decision Tree Classifier
Dados de Treino
Matriz de Confusão
----------------------------------------
                     Previsto
                Churn       Não-Churn
     Churn        2879         786
Real
     Não-Churn    734         2881
----------------------------------------
Métricas
----------------------------------------
Acurácia: 0.791209
Precisão: 0.796845
Recall: 0.785539
F1: 0.791151

Dados de Teste
Matriz de Confusão
----------------------------------------
                     Previsto
                Churn       Não-Churn
     Churn        1173         336
Real
     Não-Churn    342         1217
----------------------------------------
Métricas
----------------------------------------
Acurácia: 0.779009
Precisão: 0.774257
Recall: 0.777336
F1: 0.775794


# Random Florest

In [60]:
from pyspark.ml.classification import RandomForestClassifier

In [61]:
rfc = RandomForestClassifier(seed=seed)

In [62]:
rfc_modelo = rfc.fit(treino)

In [63]:
rfc_treino = rfc_modelo.transform(treino)

In [64]:
print(f"Acurácia: {round(evaluator.evaluate(rfc_treino, {evaluator.metricName: 'accuracy'}), 4)}")

Acurácia: 0.7913


In [65]:
rfc_teste = rfc_modelo.transform(teste)

In [66]:
print('Random Florest')
print("="*40)
print("Dados de Treino")
print("="*40)
print("Matriz de Confusão")
print("-"*40)
calcula_mostra_matriz_confusao(rfc_treino, normalize=False)
print("-"*40)
print("Métricas")
print("-"*40)
print(f"Acurácia: {round(evaluator.evaluate(rfc_treino, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(rfc_treino, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(rfc_treino, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(rfc_treino,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")
print("")
print("="*40)
print("Dados de Teste")
print("="*40)
print("Matriz de Confusão")
print("-"*40)
calcula_mostra_matriz_confusao(val_dtc, normalize=False)
print("-"*40)
print("Métricas")
print("-"*40)
print(f"Acurácia: {round(evaluator.evaluate(rfc_teste, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(rfc_teste, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(rfc_teste, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(rfc_teste,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")

Random Florest
Dados de Treino
Matriz de Confusão
----------------------------------------
                     Previsto
                Churn       Não-Churn
     Churn        3107         558
Real
     Não-Churn    961         2654
----------------------------------------
Métricas
----------------------------------------
Acurácia: 0.7913
Precisão: 0.7638
Recall: 0.8477
F1: 0.8036

Dados de Teste
Matriz de Confusão
----------------------------------------
                     Previsto
                Churn       Não-Churn
     Churn        1173         336
Real
     Não-Churn    342         1217
----------------------------------------
Métricas
----------------------------------------
Acurácia: 0.7774
Precisão: 0.7393
Recall: 0.8456
F1: 0.7889


# Cross Validation

In [67]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [75]:
grid = ParamGridBuilder()\
        .addGrid(dtc.maxDepth,[2,5,10])\
        .addGrid(dtc.maxBins, [10,32,45])\
        .build()


In [76]:
dtc_cv = CrossValidator(estimator=dtc, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=seed)

In [77]:
dtc_cv_modelo = dtc_cv.fit(treino)

24/03/17 06:59:52 WARN CacheManager: Asked to cache already cached data.        0]
24/03/17 06:59:52 WARN CacheManager: Asked to cache already cached data.
                                                                                

In [78]:
dtc_cv_prev = dtc_cv_modelo.transform(teste)

In [83]:
print(f"Acurácia: {round(evaluator.evaluate(dtc_cv_prev, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(dtc_cv_prev, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(dtc_cv_prev, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(dtc_cv_prev,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")

Acurácia: 0.7832
Precisão: 0.7468
Recall: 0.8463
F1: 0.7934


In [84]:
grid = ParamGridBuilder()\
        .addGrid(rfc.maxDepth,[2,5,10])\
        .addGrid(rfc.maxBins, [10,32,45])\
        .addGrid(rfc.numTrees, [10,20,50])\
        .build()


In [85]:
rfc_cv = CrossValidator(estimator=rfc, estimatorParamMaps=grid, evaluator=evaluator, numFolds=5, seed=seed)

In [87]:
rfc_cv_treino = rfc_cv.fit(teste)

24/03/17 07:29:38 WARN DAGScheduler: Broadcasting large task binary with size 1119.0 KiB
24/03/17 07:29:39 WARN DAGScheduler: Broadcasting large task binary with size 1301.4 KiB
24/03/17 07:29:39 WARN DAGScheduler: Broadcasting large task binary with size 1823.3 KiB
24/03/17 07:29:39 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/03/17 07:29:40 WARN DAGScheduler: Broadcasting large task binary with size 1519.3 KiB
24/03/17 07:29:41 WARN DAGScheduler: Broadcasting large task binary with size 1100.3 KiB
24/03/17 07:29:42 WARN DAGScheduler: Broadcasting large task binary with size 1323.1 KiB
24/03/17 07:29:43 WARN DAGScheduler: Broadcasting large task binary with size 1851.0 KiB
24/03/17 07:29:43 WARN DAGScheduler: Broadcasting large task binary with size 2.4 MiB
24/03/17 07:29:43 WARN DAGScheduler: Broadcasting large task binary with size 1559.4 KiB
24/03/17 07:29:45 WARN DAGScheduler: Broadcasting large task binary with size 1113.7 KiB
24/03/17 07:29:46 WARN DAGS

In [88]:
rfc_cv_teste = rfc_cv_treino.transform(teste)

In [89]:
print(f"Acurácia: {round(evaluator.evaluate(rfc_cv_teste, {evaluator.metricName: 'accuracy'}), 4)}")
print(f"Precisão: {round(evaluator.evaluate(rfc_cv_teste, {evaluator.metricName: 'precisionByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"Recall: {round(evaluator.evaluate(rfc_cv_teste, {evaluator.metricName: 'recallByLabel', evaluator.metricLabel: 1}), 4)}")
print(f"F1: {round(evaluator.evaluate(rfc_cv_teste,  {evaluator.metricName: 'fMeasureByLabel', evaluator.metricLabel: 1}), 4)}")

24/03/17 07:31:24 WARN DAGScheduler: Broadcasting large task binary with size 1566.9 KiB


Acurácia: 0.8879


24/03/17 07:31:24 WARN DAGScheduler: Broadcasting large task binary with size 1566.9 KiB


Precisão: 0.8473


24/03/17 07:31:24 WARN DAGScheduler: Broadcasting large task binary with size 1566.9 KiB


Recall: 0.9417
F1: 0.892


24/03/17 07:31:25 WARN DAGScheduler: Broadcasting large task binary with size 1566.9 KiB


In [91]:
rfc_cv_treino.bestModel

RandomForestClassificationModel: uid=RandomForestClassifier_0074bd435833, numTrees=50, numClasses=2, numFeatures=24

In [92]:
best = rfc_cv_treino.bestModel

In [95]:
rfc_tunning = RandomForestClassifier(numTrees=50,seed=seed)