### Importação de bibliotecas
execução no jupyter via anaconda

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,
                                OneHotEncoder,StringIndexer)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#caso o codigo seja executado no google colab, instalar as bibliotecas abaixo
!pip install pyspark
!pip install -q findspark
import pyarrow
import pyspark
import pandas as pd
import pyspark.pandas as ps
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
import csv
import matplotlib.pyplot as plt
%matplotlib inline

### Inicia uma sessão Spark

In [2]:
spark = SparkSession.builder.appName("atividade_somativa").getOrCreate()

22/06/18 18:13:08 WARN Utils: Your hostname, douglas resolves to a loopback address: 127.0.1.1; using 192.168.100.32 instead (on interface enp6s0)
22/06/18 18:13:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/18 18:13:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark

### Carrega o dataset gerado já manipulado gerado ao fim da parte_1

In [4]:
df = spark.read.csv("dados_saude.csv",inferSchema=True,header=True)



In [5]:
df.printSchema()

root
 |-- Sexo: string (nullable = true)
 |-- Descrição do CID: string (nullable = true)
 |-- Solicitação de Exames: string (nullable = true)
 |-- Encaminhamento para Atendimento Especialista: string (nullable = true)
 |-- Desencadeou Internamento: integer (nullable = true)
 |-- Abastecimento: string (nullable = true)
 |-- Energia Elétrica: string (nullable = true)
 |-- Tipo de Habitação: string (nullable = true)
 |-- Destino Lixo: string (nullable = true)
 |-- Fezes/Urina: string (nullable = true)



### Selecionada algumas colunas para que abaixo fosse elimiando os nulls. Ao executar o código sem fazer isso um erro ocorria devido a existências de nulls, mesmo tendo os eliminado em etapa enterior.

In [6]:
colunas = df.select(["Sexo",
                     "Descrição do CID",
                     "Solicitação de Exames",
                     "Encaminhamento para Atendimento Especialista",
                     "Desencadeou Internamento",
                     "Abastecimento",
                     "Energia Elétrica",
                     "Tipo de Habitação",
                     "Destino Lixo",
                     "Fezes/Urina"
]
)

In [7]:
df = colunas.na.drop()

### Todos os dados categóricos são transformados em vetores para que possam ser usados no algoritimos de machine learning

In [8]:
sexo_indexer = StringIndexer(inputCol='Sexo',outputCol='SexoIndex')
sexo_encoder = OneHotEncoder(inputCol='SexoIndex',outputCol='SexoVec')

descricao_procedimento_indexer = StringIndexer(inputCol='Descrição do Procedimento',outputCol='Descricao_ProcedimentoIndex')
descricao_procedimento_encoder = OneHotEncoder(inputCol='Descricao_ProcedimentoIndex',outputCol='Descricao_ProcedimentoVec')

In [9]:
descricao_CID_indexer = StringIndexer(inputCol='Descrição do CID',outputCol='Descricao_CIDIndex')
descricao_CID_encoder = OneHotEncoder(inputCol='Descricao_CIDIndex',outputCol='Descricao_CIDVec')

In [10]:
solicitacao_exames_indexer = StringIndexer(inputCol='Solicitação de Exames',outputCol='Solicitacao_ExamesIndex')
solicitacao_exames_enconder = OneHotEncoder(inputCol='Solicitacao_ExamesIndex',outputCol='Solicitacao_ExamesVec')

In [11]:
encaminhamento_especialista_indexer = StringIndexer(inputCol='Encaminhamento para Atendimento Especialista',outputCol='Encaminhamento_EspecialistaIndex')
encaminhamento_especialista_encoder = OneHotEncoder(inputCol='Encaminhamento_EspecialistaIndex',outputCol='Encaminhamento_EspecialistaVec')

In [12]:
desencadeou_internamento_indexer = StringIndexer(inputCol='Desencadeou Internamento',outputCol='Desencadeou_InternamentoIndex')
desencadeou_internamento_encoder = OneHotEncoder(inputCol='Desencadeou_InternamentoIndex',outputCol='Desencadeou_InternamentoVec')

In [13]:
abastecimento_indexer = StringIndexer(inputCol='Abastecimento',outputCol='AbastecimentoIndex')
abastecimentoIndex_encoder = OneHotEncoder(inputCol='AbastecimentoIndex',outputCol='AbastecimentoVec')

In [14]:
energia_eletrica_indexer = StringIndexer(inputCol='Energia Elétrica',outputCol='Energia_EletricaIndex')
energia_eletrica_encoder = OneHotEncoder(inputCol='Energia_EletricaIndex',outputCol='Energia_EletricaVec')

In [15]:
tipo_habitacao_indexer = StringIndexer(inputCol='Tipo de Habitação',outputCol='Tipo_HabitacaoIndex')
tipo_habitacao_encoder = OneHotEncoder(inputCol='Tipo_HabitacaoIndex',outputCol='Tipo_HabitacaoVec')

In [16]:
destino_lixo_indexer = StringIndexer(inputCol='Destino Lixo',outputCol='Destino_LixoIndex')
destino_lixo_encoder = OneHotEncoder(inputCol='Destino_LixoIndex',outputCol='Destino_LixoVec')

In [17]:
fezes_urina_indexer = StringIndexer(inputCol='Fezes/Urina',outputCol='Fezes_UrinaIndex')
fezes_urina_encoder = OneHotEncoder(inputCol='Fezes_UrinaIndex',outputCol='Fezes_UrinaVec')

### A partir dos vetores criados acima é criada uma coluna "features". Posteriormente será usa no algorítimo de classificação

In [18]:
assembler = VectorAssembler(inputCols=[
 'SexoVec',
 'Descricao_CIDVec',
 'Solicitacao_ExamesVec',
 'Encaminhamento_EspecialistaVec',
 'AbastecimentoVec',
'Energia_EletricaVec',
'Tipo_HabitacaoVec',
'Destino_LixoVec',
'Fezes_UrinaVec'],outputCol='features')

### Treina um modelo de Logistic Regression

In [19]:
log_reg= LogisticRegression(featuresCol='features',labelCol='Desencadeou Internamento')

### cria uma pipeline

retirei  descricao_procedimento_indexer, 
                            descricao_procedimento_encoder,

In [20]:
pipeline = Pipeline(stages=[sexo_indexer,sexo_encoder,
                            descricao_CID_indexer,
                            descricao_CID_encoder,
                            solicitacao_exames_indexer,
                            solicitacao_exames_enconder,
                            encaminhamento_especialista_indexer,
                            encaminhamento_especialista_encoder,
                            abastecimento_indexer,
                            abastecimentoIndex_encoder,
                            energia_eletrica_indexer,
                            energia_eletrica_encoder,
                            tipo_habitacao_indexer,
                            tipo_habitacao_encoder,
                            destino_lixo_indexer,
                            destino_lixo_encoder,
                            fezes_urina_indexer,
                            fezes_urina_encoder,
                           assembler,log_reg])

### Separa os dados em treino e teste

In [21]:
train_data, test_data = df.randomSplit([0.7,.3])

### Treina o modelo

In [22]:
fit_model = pipeline.fit(train_data)

                                                                                

### Testa o modelo

In [23]:
resultado = fit_model.transform(test_data)

### Avalia o modelo

In [24]:
minha_avaliacao = BinaryClassificationEvaluator(rawPredictionCol='prediction',
                                       labelCol='Desencadeou Internamento')

In [25]:
resultado.select('Desencadeou Internamento','prediction').show()

+------------------------+----------+
|Desencadeou Internamento|prediction|
+------------------------+----------+
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
+------------------------+----------+
only showing top 20 rows



In [26]:
AUC = minha_avaliacao.evaluate(resultado)



In [27]:
AUC

0.5

### Treina um modelo de Decision Tree

In [28]:
dt = DecisionTreeClassifier(featuresCol='features',labelCol='Desencadeou Internamento')

### Cria outro pipeline com os mesmos dados

retirei descricao_procedimento_indexer, 
                            descricao_procedimento_encoder,

In [29]:
pipeline2 = Pipeline(stages=[sexo_indexer,sexo_encoder,
                            descricao_CID_indexer,
                            descricao_CID_encoder,
                            solicitacao_exames_indexer,
                            solicitacao_exames_enconder,
                            encaminhamento_especialista_indexer,
                            encaminhamento_especialista_encoder,
                            abastecimento_indexer,
                            abastecimentoIndex_encoder,
                            energia_eletrica_indexer,
                            energia_eletrica_encoder,
                            tipo_habitacao_indexer,
                            tipo_habitacao_encoder,
                            destino_lixo_indexer,
                            destino_lixo_encoder,
                            fezes_urina_indexer,
                            fezes_urina_encoder,
                           assembler,dt])

### Treina o modelo

In [30]:
model = pipeline2.fit(train_data)


### Testa o modelo

In [31]:
predictions = model.transform(test_data)

In [32]:
predictions.select('Desencadeou Internamento','prediction').show()

+------------------------+----------+
|Desencadeou Internamento|prediction|
+------------------------+----------+
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
|                       0|       0.0|
+------------------------+----------+
only showing top 20 rows



### Avalia o modelo

In [33]:
evaluator2 = MulticlassClassificationEvaluator(
    labelCol="Desencadeou Internamento", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator2.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

Test Error = 0.00340221 


In [34]:
accuracy

0.9965977927142976