# Análise Exploratória e Machine Learning com Spark SQL

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *


spark = SparkSession \
        .builder \
        .master('local') \
        .appName('desafio_modulo2') \
        .getOrCreate()

#### preparação do dataset

In [3]:
from pyspark.sql.types import StructType, DateType, StringType, IntegerType, DoubleType

schema = StructType() \
    .add('Id', IntegerType(), True) \
    .add('Gender', StringType(), True) \
    .add('Age', IntegerType(), True) \
    .add('Hypertension', IntegerType(), True) \
    .add('Heart_disease', IntegerType(), True) \
    .add('Ever_married', StringType(), True) \
    .add('Work_Type', StringType(), True) \
    .add('Residence_Type', StringType(), True) \
    .add('Avg_glucose_level', DoubleType(), True) \
    .add('IMC', DoubleType(), True) \
    .add('Smoking_status', StringType(), True) \
    .add('Stroke', IntegerType(), True)

In [6]:
df = spark.read.format('csv').option('header', True).schema(schema).load('D:\csv\stroke_data.csv')
df.printSchema()
df.show()

root
 |-- Id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Hypertension: integer (nullable = true)
 |-- Heart_disease: integer (nullable = true)
 |-- Ever_married: string (nullable = true)
 |-- Work_Type: string (nullable = true)
 |-- Residence_Type: string (nullable = true)
 |-- Avg_glucose_level: double (nullable = true)
 |-- IMC: double (nullable = true)
 |-- Smoking_status: string (nullable = true)
 |-- Stroke: integer (nullable = true)

+---+------+---+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
| Id|Gender|Age|Hypertension|Heart_disease|Ever_married|    Work_Type|Residence_Type|Avg_glucose_level|  IMC| Smoking_status|Stroke|
+---+------+---+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female| 18|           0|            0|          No|      Private|         Urban|          

#### 1) Quantos registros existem no arquivo?

In [7]:
df.count()

67135

#### 2) Quantas colunas existem no arquivo? Quantas são numéricas?

In [8]:
df.printSchema()
#12 e 7

root
 |-- Id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Hypertension: integer (nullable = true)
 |-- Heart_disease: integer (nullable = true)
 |-- Ever_married: string (nullable = true)
 |-- Work_Type: string (nullable = true)
 |-- Residence_Type: string (nullable = true)
 |-- Avg_glucose_level: double (nullable = true)
 |-- IMC: double (nullable = true)
 |-- Smoking_status: string (nullable = true)
 |-- Stroke: integer (nullable = true)



#### 3) No conjunto de dados, quantos pacientes estavam doentes?

In [9]:
df.groupBy('Heart_disease').count().show()
#6809

+-------------+-----+
|Heart_disease|count|
+-------------+-----+
|            1| 6809|
|            0|60326|
+-------------+-----+



#### 4) A partir do dataframe, crie uma tabela temporária usando df.createOrReplaceTempView('table') e, a seguir, use spark.sql para escrever uma consulta SQL que obtenha quantos pacientes tiveram derrame por tipo de trabalho (work_type). Quantos pacientes que sofreram derrame trabalhavam, respectivamente, no setor privado, de forma independente, no governo e quantas são crianças?


In [10]:
from pyspark.sql.functions import *
df.createOrReplaceTempView('table')

spark.sql('SELECT Work_Type, count(*) as qtd FROM table WHERE Stroke = 1 GROUP BY Work_Type ORDER BY qtd DESC').show()

+-------------+-----+
|    Work_Type|  qtd|
+-------------+-----+
|      Private|23711|
|Self-employed|10807|
|     Govt_job| 5164|
|     children|  520|
| Never_worked|   85|
+-------------+-----+



#### 5) Escreva uma consulta com spark.sql para determinar a proporção, por gênero, de participantes do estudo. A maioria dos participantes é:

In [11]:
spark.sql('SELECT Gender, count(*) as quantidade FROM table GROUP BY Gender ORDER BY quantidade DESC').show()

+------+----------+
|Gender|quantidade|
+------+----------+
|Female|     39530|
|  Male|     27594|
| Other|        11|
+------+----------+



#### 6) Escreva uma consulta com spark.sql para determinar quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos. Você pode escrever uma consulta para cada grupo. A partir das probabilidades que você obteve, você conclui que: 

In [12]:
print('With Hypertension')
spark.sql('SELECT Stroke, count(*) as quantidade, (count(*) /(SELECT count(*) FROM table WHERE Hypertension = 1) * 100) percentual FROM table WHERE Hypertension = 1 GROUP BY Stroke ORDER BY Stroke').show()

print('Without Hypertension')
spark.sql('SELECT Stroke, count(*) as quantidade, (count(*) /(SELECT count(*) FROM table WHERE Hypertension = 0) * 100) percentual FROM table Where Hypertension = 0 GROUP BY Stroke ORDER BY Stroke').show()

With Hypertension
+------+----------+------------------+
|Stroke|quantidade|        percentual|
+------+----------+------------------+
|     0|      2200|19.969138603975676|
|     1|      8817| 80.03086139602432|
+------+----------+------------------+

Without Hypertension
+------+----------+-----------------+
|Stroke|quantidade|       percentual|
+------+----------+-----------------+
|     0|     24648|43.92173634128087|
|     1|     31470|56.07826365871913|
+------+----------+-----------------+



#### 7) Escreva uma consulta com spark.sql que determine o número de pessoas que sofreram derrame por idade. Com qual idade o maior número de pessoas do conjunto de dados sofreu derrame?

In [13]:
spark.sql('SELECT Age, count(*) as quantidade FROM table GROUP BY Age ORDER BY quantidade DESC').show()

+----+----------+
| Age|quantidade|
+----+----------+
|  79|      3258|
|  78|      2672|
|  80|      2141|
|  81|      2005|
|  82|      1657|
|  63|      1294|
|  66|      1195|
|  77|      1190|
|  74|      1184|
|  57|      1160|
|  70|      1151|
|null|      1091|
|  76|      1088|
|  67|      1070|
|  51|      1067|
|  65|      1046|
|  75|      1015|
|  52|      1003|
|  58|       999|
|  59|       994|
+----+----------+
only showing top 20 rows



#### 8) Usando a API de dataframes, determine quantas pessoas sofreram derrames após os 50 anos.

In [14]:
df.filter('Age > 50 and Stroke == 1').count()

28938

#### 9) Usando spark.sql, determine qual o nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.

In [15]:
spark.sql('SELECT Stroke, avg(Avg_glucose_level) FROM table GROUP BY Stroke').show()

+------+----------------------+
|Stroke|avg(Avg_glucose_level)|
+------+----------------------+
|     1|    119.95307046938272|
|     0|    103.60273130214506|
+------+----------------------+



#### 10) Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?

In [16]:
spark.sql('SELECT Stroke, avg(IMC) FROM table GROUP BY Stroke').show()
df.groupBy('stroke').avg('IMC').show()

+------+------------------+
|Stroke|          avg(IMC)|
+------+------------------+
|     1|29.942490629729495|
|     0|27.989678933253657|
+------+------------------+

+------+------------------+
|stroke|          avg(IMC)|
+------+------------------+
|     1|29.942490629729495|
|     0|27.989678933253657|
+------+------------------+



## MACHINE LEARNING

#### 11) Crie um modelo de árvore de decisão que prevê a chance de derrame (stroke) a partir das variáveis contínuas/categóricas: idade, BMI, hipertensão, doença do coração, nível médio de glicose. Use o conteúdo da Segunda Aula Interativa para criar e avaliar o modelo. Qual a acurácia de um modelo construído?

In [17]:
mean_age = df.agg({'Age': 'mean'}).collect()[0][0]

df = df.fillna(mean_age, subset=['Age'])

df.groupBy('Age').count().show()

+---+-----+
|Age|count|
+---+-----+
| 31|  592|
| 65| 1046|
| 53|  842|
| 78| 2672|
| 34|  587|
| 81| 2005|
| 28|  540|
| 76| 1088|
| 26|  503|
| 27|  558|
| 44|  671|
| 12|  398|
| 22|  503|
| 47|  872|
|  1|   34|
| 52| 2094|
| 13|  419|
| 16|  426|
|  6|  246|
|  3|  402|
+---+-----+
only showing top 20 rows



In [18]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["Age", "IMC", "Hypertension", "Heart_disease", "Avg_glucose_level"], outputCol = 'features')

In [19]:
from pyspark.ml.classification import DecisionTreeClassifier
decision_tree_classifier = DecisionTreeClassifier(labelCol = 'Stroke', featuresCol = 'features')

In [20]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, decision_tree_classifier])

In [21]:
train_data, test_data = df.randomSplit([0.7,0.3])

In [22]:
pipelineModel = pipeline.fit(train_data)

In [24]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

decision_tree_predictions = pipelineModel.transform(test_data)

acc_evaluator = MulticlassClassificationEvaluator(labelCol = 'Stroke', predictionCol = 'prediction', metricName = 'accuracy')

dtc_acc = acc_evaluator.evaluate(decision_tree_predictions)

dtc_acc

0.6866683156072224