## Predição da ocorrência de derrame cerebral em pacinetes

In [2]:
# Inicializando o Spark

import os

os.environ["SPARK_HOME"] = "C:\Spark\spark-3.1.3-bin-hadoop2.7"

import findspark
findspark.init()

In [3]:
# Criando a sessão Spark

from pyspark.sql import SparkSession

# Spark entry point
spark = SparkSession \
    .builder \
    .appName("Desafio - Cientista de Dados - Apache Spark") \
    .getOrCreate()

spark.version


'3.1.3'

In [4]:
# Carregando a planilha CSV
stroke_df = spark.read.csv('C:/Users/Renan/Spark_desafio/stroke_data.csv',header='True',inferSchema='True')


# Detalhes dos atributos
stroke_df.printSchema()


root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [5]:
stroke_df.show(5)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [6]:
# Quantidade de registros no arquivo
stroke_df.count()

67135

In [7]:
# Número de colunas no arquivo
len(stroke_df.columns)

12

In [8]:
# Número de pacientes que sofreram derrame
column_stroke_df = stroke_df.select("stroke")
x = column_stroke_df.agg({'stroke' : 'sum'}) #Realizei a soma, pois os que sofreram derrame estão indicados com 1 e os demais com 0
x.show()

+-----------+
|sum(stroke)|
+-----------+
|      40287|
+-----------+



In [9]:
# Número de pacientes que não sofreram derrame
no_stroke_df = stroke_df[stroke_df.stroke == 0]
no_stroke_df.count()

26848

In [10]:
#DataFrame para pacinetes que sofreram derrame
positive_stroke_df = stroke_df[stroke_df.stroke == 1]

In [11]:
#Criando um DataFrame temporário a partir do "positive_stroke_df", para possibilitar fazer consultas SQL com o spark.sql
positive_stroke_df.createOrReplaceTempView("temporaria_df")
sqlDF = spark.sql("SELECT * FROM temporaria_df")
sqlDF.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  5|Female|82.0|           0|            0|         Yes|      Private|         Rural|            59.32| 33.2|         smokes|     1|
|  8|Female|37.0|           0|            0|         Yes|      Private|         Rural|            156.7| 36.9|         smokes|     1|
|  9|Female|41.0|           0|            0|         Yes|     Govt_job|         Rural|            64.06| 33.8|         smokes|     1|
| 10|Female|70.0|           0|            0|         Yes|Self-

## Utilizando consultas SQL

In [12]:
# Quantidade de pacinentes que sofreram derrame e trabalham no setor Privado
spark.sql("SELECT work_type FROM temporaria_df WHERE work_type = 'Private'").count()

23711

In [13]:
# Quantidade de pacinentes que sofreram derrame e trabalha no setor Privado
spark.sql("SELECT work_type FROM temporaria_df WHERE work_type = 'Self-employed'").count()

10807

In [14]:
# Quantidade de pacinentes que sofreram derrame e trabalham no governo
spark.sql("SELECT work_type FROM temporaria_df WHERE work_type = 'Govt_job'").count()

5164

In [15]:
#Agrupando e somando as categorias de pessoas e suas ocupações, que sofreram derrame
spark.sql("SELECT work_type, count(*) as work_count FROM temporaria_df WHERE stroke == 1 GROUP BY work_type ORDER BY work_count DESC").show()

+-------------+----------+
|    work_type|work_count|
+-------------+----------+
|      Private|     23711|
|Self-employed|     10807|
|     Govt_job|      5164|
|     children|       520|
| Never_worked|        85|
+-------------+----------+



In [16]:
# Agrupando as incidências de derrame por idade, ordenando da idade de maior para a de menor incidência
spark.sql("SELECT age, count(*) as age_count FROM temporaria_df WHERE stroke == 1 GROUP BY age ORDER BY age_count DESC").show()

+----+---------+
| age|age_count|
+----+---------+
|79.0|     2916|
|78.0|     2279|
|80.0|     1858|
|81.0|     1738|
|82.0|     1427|
|77.0|      994|
|74.0|      987|
|63.0|      942|
|76.0|      892|
|70.0|      881|
|66.0|      848|
|75.0|      809|
|67.0|      801|
|57.0|      775|
|73.0|      759|
|65.0|      716|
|72.0|      709|
|68.0|      688|
|69.0|      677|
|71.0|      667|
+----+---------+
only showing top 20 rows



## Quantidade de pessoas que sofreram derrames após os 50 anos.

In [17]:
spark.sql("SELECT age FROM temporaria_df WHERE  age > 50.0").count()

28938

In [18]:
spark.sql("SELECT AVG(bmi) FROM temporaria_df ").show()

+------------------+
|          avg(bmi)|
+------------------+
|29.942490629729495|
+------------------+



In [19]:
#Tipos de classificações que compõem a coluna "work_type", utilizando select.distinct
stroke_df.select("work_type").distinct().show()

+-------------+
|    work_type|
+-------------+
| Never_worked|
|Self-employed|
|      Private|
|     children|
|     Govt_job|
+-------------+



In [18]:
#Pergunta 5
stroke_df.select("gender").distinct().show()

+------+
|gender|
+------+
|Female|
| Other|
|  Male|
+------+



In [25]:
#spark.sql("SELECT gender, count(*) FROM temporaria_df").show()
spark.sql("SELECT gender, count(*) as work_count FROM temporaria_df WHERE stroke == 1 GROUP BY work_type ORDER BY work_count DESC").show()

AnalysisException: expression 'temporaria_df.`gender`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;
Sort [work_count#414L DESC NULLS LAST], true
+- Aggregate [work_type#102], [gender#97, count(1) AS work_count#414L]
   +- Filter (stroke#107 = 1)
      +- SubqueryAlias temporaria_df
         +- Filter (stroke#107 = 1)
            +- Relation[0#96,gender#97,age#98,hypertension#99,heart_disease#100,ever_married#101,work_type#102,Residence_type#103,avg_glucose_level#104,bmi#105,smoking_status#106,stroke#107] csv


## Criei um novo DF temporário, com todos os pacinetes do estudo, para fazer outras buscas SQL, a partir da pergunta 5

In [20]:
#DataFrame para pacinetes que sofreram derrame
all_stroke_df = stroke_df

#Criando um DataFrame temporário a partir do "positive_stroke_df", para possibilitar fazer consultas SQL com o spark.sql

all_stroke_df.createOrReplaceTempView("temporaria2_df")
sqlDF2 = spark.sql("SELECT * FROM temporaria2_df")
sqlDF2.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [21]:
# Proporção do gênero feminino no estudo
spark.sql("SELECT gender FROM temporaria2_df WHERE gender LIKE 'F%'").count()

39530

In [22]:
# Proporção do gênero masculino no estudo
spark.sql("SELECT gender FROM temporaria2_df WHERE gender LIKE 'M%'").count()

27594

In [23]:
# Proporção de outros gêneros no estudo
spark.sql("SELECT gender FROM temporaria2_df WHERE gender LIKE 'O%'").count()

11

In [None]:
#Pergunta 6

## Influência da hipertensão na incidência de derrames

In [50]:
#Total de pessoas nao-hipertensas
a = spark.sql("SELECT hypertension FROM temporaria2_df WHERE hypertension == 0 ").count()
a

56118

In [55]:
#Total de não hipertensos que sofreram derrame
b = spark.sql("SELECT hypertension FROM temporaria_df WHERE hypertension == 0 ").count()
b

31470

In [52]:
#Proporção de não hipertensos que sofreram derrame
b/a

0.15711536405431412

In [53]:
#Total de pessoas hipertensas
c= spark.sql("SELECT hypertension FROM temporaria2_df WHERE hypertension == 1 ").count()
c

11017

In [54]:
#Total de hipertensos que sofreram derrame
d = spark.sql("SELECT hypertension FROM temporaria_df WHERE hypertension == 1 ").count()
d

8817

In [56]:
#Proporção de não hipertensos que sofreram derrame
d/c

0.8003086139602432

## Nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.

In [77]:
#SOFREU
spark.sql("SELECT avg(avg_glucose_level)  FROM temporaria_df").show()

+----------------------+
|avg(avg_glucose_level)|
+----------------------+
|    119.95307046938272|
+----------------------+



In [75]:
#NÃO SOFREU
spark.sql("SELECT avg(avg_glucose_level) hypertension FROM temporaria2_df").show()

+------------------+
|      hypertension|
+------------------+
|113.41439606762462|
+------------------+



## Nível médio de BMI (IMC = índice de massa corpórea) para pessoas que, respectivamente, sofreram e não sofreram derrame.

In [24]:
#SOFREU
spark.sql("SELECT avg(bmi)  FROM temporaria_df").show()

+------------------+
|          avg(bmi)|
+------------------+
|29.942490629729495|
+------------------+



In [80]:
#NÃO SOFREU
spark.sql("SELECT avg(bmi) hypertension FROM temporaria2_df").show()

+-----------------+
|     hypertension|
+-----------------+
|29.16154047813857|
+-----------------+



In [78]:
#Exemplo com groupBy
stroke_df.groupBy("age", "gender").count().show()


+----+------+-----+
| age|gender|count|
+----+------+-----+
| 6.0|Female|  126|
| 5.0|Female|  258|
|10.0|  Male|  170|
|77.0|Female|  856|
| 3.0|  Male|  209|
|57.0|  Male|  620|
|1.16|Female|   24|
|66.0|  Male|  509|
|81.0|Female| 1319|
|11.0|  Male|  167|
| 4.0|Female|  182|
|1.72|Female|   23|
|30.0|Female|  343|
|67.0|Female|  600|
| 8.0|  Male|  225|
|53.0| Other|    2|
|0.56|Female|   21|
|12.0|Female|  178|
| 8.0|Female|  211|
|47.0|  Male|  369|
+----+------+-----+
only showing top 20 rows



## Previsão da ocorrência de derrame nos pacientes, utilizando Árvore de Decisão

In [25]:
stroke_df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [26]:
stroke_df.show(5)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [27]:
#Selecionando as colunas que eu vou usar para a Árvore de Decisão

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['age', 'hypertension', 'heart_disease', 'avg_glucose_level'], outputCol='features')

In [28]:
#Usando a Árvore de decisão

# Em 'labelCol' coloco a coluna que vamos fazer as predições
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

classifier

DecisionTreeClassifier_555ccbc5c4a4

In [30]:
# Utilizando o pipeline, que é um fluxo de trabalho completo que combina vários algoritmos de aprendizado de máquina
#Eles definem os estágios e a ordenação de um processo de apredizagem de máquina

from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, classifier])

In [31]:
#Separando parte dos dados para treinos e testes

train_data, test_data = stroke_df.randomSplit([0.7, 0.3])

In [33]:
#Criando um modelo criado com os dados de treinamento

predictStrokedModel = pipeline.fit(train_data)

In [34]:
#Realizando as predições com o modelo criado

predictions = predictStrokedModel.transform(test_data)
predictions.select('age', 'hypertension', 'heart_disease', 'avg_glucose_level', 'rawPrediction', 'prediction').show(5)

+----+------------+-------------+-----------------+---------------+----------+
| age|hypertension|heart_disease|avg_glucose_level|  rawPrediction|prediction|
+----+------------+-------------+-----------------+---------------+----------+
|82.0|           0|            0|            59.32|[1615.0,9714.0]|       1.0|
|33.0|           0|            0|           193.42|[3555.0,3052.0]|       0.0|
|37.0|           0|            0|            156.7|[4981.0,5342.0]|       1.0|
|72.0|           0|            1|           235.22| [835.0,2941.0]|       1.0|
|20.0|           0|            0|           104.78|[3555.0,3052.0]|       0.0|
+----+------------+-------------+-----------------+---------------+----------+
only showing top 5 rows



In [35]:
#Calculando a precisão do modelo criado

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy


0.6864343958487769

Obteve-se uma precisão de 68,6% com o modelo criado, utilizando as colunas referentes à idade, hipertensão, doença cardíaca e nível médio de glicose.

## Adicionando mais variáveis para tentar melhorar a precisão da predição

In [37]:
#Incluindo a variável categórica gênero, transformando o conteúdo dessa coluna em vetores, para ser utilizado na predição

from pyspark.ml.feature import StringIndexer, OneHotEncoder


#Indexamos os nomes em uma nova coluna
gender_indexer = StringIndexer(inputCol='gender', outputCol='genderIndex')

#Criamos uma coluna de vetores que será utilizada na predição
gender_encoder = OneHotEncoder(inputCol='genderIndex', outputCol='genderVector')


#Incluindo a variável categórica gênero, fazendo a mesma transformação para vetores

from pyspark.ml.feature import StringIndexer, OneHotEncoder

smoke_indexer = StringIndexer(inputCol='smoking_status', outputCol='smokeIndex')
smoke_encoder = OneHotEncoder(inputCol='smokeIndex', outputCol='smokeVector')



In [38]:
#Adicionando as colunas vetorias recém-criadas, smokeVector e genderVector

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['age', 'hypertension', 'heart_disease', 'avg_glucose_level', 'smokeVector', 'genderVector'], outputCol='features')

In [39]:
#Usando a Árvore de decisão 

# Em 'labelCol' coloca a coluna que vamos fazer as predições
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

classifier

DecisionTreeClassifier_8a283e793eb2

In [40]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[gender_indexer, gender_encoder, smoke_indexer, smoke_encoder, assembler, classifier])

In [41]:
train_data, test_data = stroke_df.randomSplit([0.7, 0.3])

In [42]:
predictStrokedModel = pipeline.fit(train_data)

In [43]:
predictions = predictStrokedModel.transform(test_data)
predictions.select('age', 'hypertension', 'heart_disease', 'avg_glucose_level','genderVector' , 'gender','smokeVector','smoking_status', 'prediction').show(5)

+----+------------+-------------+-----------------+-------------+------+-------------+---------------+----------+
| age|hypertension|heart_disease|avg_glucose_level| genderVector|gender|  smokeVector| smoking_status|prediction|
+----+------------+-------------+-----------------+-------------+------+-------------+---------------+----------+
|18.0|           0|            0|            94.19|(2,[0],[1.0])|Female|(2,[0],[1.0])|         smokes|       1.0|
|41.0|           0|            0|            64.06|(2,[0],[1.0])|Female|(2,[0],[1.0])|         smokes|       1.0|
|72.0|           0|            1|           235.22|(2,[1],[1.0])|  Male|(2,[1],[1.0])|formerly smoked|       1.0|
|41.0|           0|            0|            159.3|(2,[1],[1.0])|  Male|(2,[0],[1.0])|         smokes|       1.0|
|69.0|           0|            0|            64.06|(2,[1],[1.0])|  Male|(2,[1],[1.0])|formerly smoked|       0.0|
+----+------------+-------------+-----------------+-------------+------+-------------+--

In [45]:
#Calculando novamente a precisão do nosso modelo, agora adicionada as colunas que referem ao status de fumante e gênero

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(predictions)

accuracy

0.8318636678886356

Notamos um bom aumento na precisão de nossa predição, chegando a casa dos 83,1% de precisão com relação aos casos de pacinetes que sofrerão derrame, com a adição das variáveis fumante e gênero.

In [53]:
#Obtendo parâmetros da Árvore de Decisão: Profundidade, número de nós, número de classes, número de features

#Colquei 'stages[-1]' porque pegou o ultimo elemento da lista 'predictStrokedModel', como pode ver no print
decisionTreeModel = predictStrokedModel.stages[-1]

display(decisionTreeModel)

#decisionTreeModel.depth

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_8a283e793eb2, depth=5, numNodes=19, numClasses=2, numFeatures=8

In [52]:
#Profundidade da Árvore de Decisão
decisionTreeModel.depth

5

## Grau de relevância de cada variável para a incidência de derrame

In [54]:
list(zip(assembler.getInputCols(), decisionTreeModel.featureImportances))

[('age', 0.17213452387853628),
 ('hypertension', 0.0),
 ('heart_disease', 0.0),
 ('avg_glucose_level', 0.007431288840423885),
 ('smokeVector', 0.4879526909808828),
 ('genderVector', 0.331995578154012)]

Pudemos notar que a variável com maior imapacto foi o status de fumante, seguida do gênero e idade. De acordo com nosso modelo, a hipertensão e doenças cardíacas não tiveram influência para o aumento dos casos de derrames.

In [51]:
decisionTreeModel.toDebugString

'DecisionTreeClassificationModel: uid=DecisionTreeClassifier_8a283e793eb2, depth=5, numNodes=19, numClasses=2, numFeatures=8\n  If (feature 4 in {0.0})\n   If (feature 5 in {0.0})\n    Predict: 0.0\n   Else (feature 5 not in {0.0})\n    If (feature 0 <= 56.5)\n     Predict: 0.0\n    Else (feature 0 > 56.5)\n     If (feature 0 <= 73.5)\n      If (feature 3 <= 74.16)\n       Predict: 0.0\n      Else (feature 3 > 74.16)\n       Predict: 1.0\n     Else (feature 0 > 73.5)\n      Predict: 1.0\n  Else (feature 4 not in {0.0})\n   If (feature 0 <= 66.5)\n    Predict: 1.0\n   Else (feature 0 > 66.5)\n    If (feature 0 <= 73.5)\n     Predict: 1.0\n    Else (feature 0 > 73.5)\n     If (feature 6 in {0.0})\n      If (feature 3 <= 58.465)\n       Predict: 0.0\n      Else (feature 3 > 58.465)\n       Predict: 1.0\n     Else (feature 6 not in {0.0})\n      Predict: 1.0\n'