In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler #pre processamento dos dados
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [0]:
spark = SparkSession\
        .builder\
        .appName('Stroke Prediction')\
        .getOrCreate()

In [0]:
#Lendo o arquivo
df = spark.read.csv('/FileStore/tables/stroke_data.csv', header=True, inferSchema=True)

In [0]:
df.show(5)

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [0]:
#quantidade de registros
num_linhas = df.count()
num_linhas

Out[7]: 67135

In [0]:
#visualizando quantidade de colunas e mais detalhes
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



In [0]:
#analisando a quantidade de pacientes que sofreram e não sofreram derrame
df.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|40287|
|     0|26848|
+------+-----+



Primeiro vamos analisar a relação do tipo de trabalho do paciente com a prossibilidade de ter derrame ou não

In [0]:
#criando uma tabela temporária
df.createOrReplaceTempView('dados_pacientes')

#fazendo uma consulta SQL para contar quantos pacientes sofreram derrame por tipo de trabalho
resultado_trabalhos = spark.sql("""
    SELECT work_type, COUNT(*) as count
    FROM dados_pacientes
    WHERE stroke = 1
    GROUP BY work_type
    """)
resultado_trabalhos.show()
#é possível perceber que o setor privado tem o maior número de trabalhadores que sofreram derrame

+-------------+-----+
|    work_type|count|
+-------------+-----+
| Never_worked|   85|
|Self-employed|10807|
|      Private|23711|
|     children|  520|
|     Govt_job| 5164|
+-------------+-----+



Vamos entender agora a proporção de genero dos pacientes

In [0]:
df.createOrReplaceTempView('genero_pacientes')

resultado_generos = spark.sql("""
    SELECT gender, COUNT(*)
    FROM genero_pacientes
    GROUP BY gender
    """)

resultado_generos.show()
#Temos um número maior de mulheres do que homens e outros generos

+------+--------+
|gender|count(1)|
+------+--------+
|Female|   39530|
| Other|      11|
|  Male|   27594|
+------+--------+



Agora quero analisar quantos pacientes hipertensos e não hipertensos tiveram derrame

In [0]:
df.createOrReplaceTempView('pacientes_hipertensos')

#vamos fazer uma consulta para cada grupo e comprar os resultados
consulta_hipertensos = spark.sql("""
    SELECT 'Pacientes com Derrame' AS categoria, SUM(CASE WHEN stroke = 1 THEN 1 ELSE 0 END) AS pacientes_com_derrame
    FROM pacientes_hipertensos
    WHERE hypertension = 1
    UNION ALL
    SELECT 'Pacientes sem Derrame' AS categoria, SUM(CASE WHEN stroke = 0 THEN 1 ELSE 0 END) AS pacientes_sem_derrame
    FROM pacientes_hipertensos
    WHERE hypertension = 1
""")

#agora uma consulta com os pacientes sem hipertensao
df.createOrReplaceTempView('pacientes_nao_hipertensos')

consulta_nao_hipertensos = spark.sql("""
    SELECT 'Pacientes com Derrame' AS categoria, SUM(CASE WHEN stroke = 1 THEN 1 ELSE 0 END) AS pacientes_com_derrame
    FROM pacientes_nao_hipertensos
    WHERE hypertension = 0
    UNION ALL
    SELECT 'Pacientes sem Derrame' AS categoria, SUM(CASE WHEN stroke = 0 THEN 1 ELSE 0 END) AS pacientes_sem_derrame
    FROM pacientes_nao_hipertensos
    WHERE hypertension = 0""")

consulta_hipertensos.show()
consulta_nao_hipertensos.show()

+--------------------+---------------------+
|           categoria|pacientes_com_derrame|
+--------------------+---------------------+
|Pacientes com Der...|                 8817|
|Pacientes sem Der...|                 2200|
+--------------------+---------------------+

+--------------------+---------------------+
|           categoria|pacientes_com_derrame|
+--------------------+---------------------+
|Pacientes com Der...|                31470|
|Pacientes sem Der...|                24648|
+--------------------+---------------------+



In [0]:
#com essas informações podemos calcular a probabilidade de ter derrame nos hipertensos e nos não hipertensos

#calculo de probabilidade para pacientes hipertensos
total_hipertensos = consulta_hipertensos.select('pacientes_com_derrame').first()[0] + consulta_hipertensos.select('pacientes_com_derrame').collect()[1][0]

pacientes_derrame_hipertensos = consulta_hipertensos.filter(consulta_hipertensos['categoria'] == 'Pacientes com Derrame').select('pacientes_com_derrame').first()[0]

probabilidade_derrame_hipertensos = (pacientes_derrame_hipertensos/total_hipertensos)*100

# Cálculo da probabilidade para pacientes não hipertensos
total_nao_hipertensos = consulta_nao_hipertensos.select('pacientes_com_derrame').first()[0] + consulta_nao_hipertensos.select('pacientes_com_derrame').collect()[1][0]

pacientes_com_derrame_nao_hipertensos = consulta_nao_hipertensos.filter(consulta_nao_hipertensos['categoria'] == 'Pacientes com Derrame').select('pacientes_com_derrame').first()[0]

probabilidade_derrame_nao_hipertensos = (pacientes_com_derrame_nao_hipertensos / total_nao_hipertensos)*100

#exibindo resultados
print(f'Probabilidade de um hipertenso ter derrame: {probabilidade_derrame_hipertensos:.2f}%')
print(f'Probabilidade de um nao hipertenso ter derrame: {probabilidade_derrame_nao_hipertensos:.2f}%')

#podemos ver um probabilidade muito maior de que pacientes hipertensos tenham derrame

Probabilidade de um hipertenso ter derrame: 80.03%
Probabilidade de um nao hipertenso ter derrame: 56.08%


Dterminando com qual idade o maior número de pessoas sofreram derrame

In [0]:
df.createOrReplaceTempView('idade_derrame')

consulta_idade = spark.sql("""
    SELECT age, COUNT(*) AS count
    FROM idade_derrame
    WHERE stroke = 1
    GROUP BY age
    ORDER BY count DESC
    """)
consulta_idade.show()

+----+-----+
| age|count|
+----+-----+
|79.0| 2916|
|78.0| 2279|
|80.0| 1858|
|81.0| 1738|
|82.0| 1427|
|77.0|  994|
|74.0|  987|
|63.0|  942|
|76.0|  892|
|70.0|  881|
|66.0|  848|
|75.0|  809|
|67.0|  801|
|57.0|  775|
|73.0|  759|
|65.0|  716|
|72.0|  709|
|68.0|  688|
|69.0|  677|
|71.0|  667|
+----+-----+
only showing top 20 rows



Vamos ver quantos pacientes sofreram derrame após os 50 anos

In [0]:
#filtrando pessoas acima de 50 anos
df_filtrado = df.filter(df.age>50)

#pessoas que sofreram derrame
df_stroke = df_filtrado.filter(df_filtrado.stroke == 1)

#contar o número de pacientes
count_stroke = df_stroke.count()
count_stroke

Out[15]: 28938

Relação entre o nível médio de glicose com ter o derrame

In [0]:
df.createOrReplaceTempView('media_glicose')

consulta_derrame = spark.sql("""
    SELECT AVG(avg_glucose_level) AS media_glicose_derrame
    FROM media_glicose
    WHERE stroke = 1
    """)


consulta_sem_derrame = spark.sql("""
    SELECT AVG(avg_glucose_level) AS media_glicose_sem_derrame
    FROM media_glicose
    WHERE stroke = 0
    """)

#obtendo o resultado das consultas
media_glicose_derrame = consulta_derrame.first()[0]
media_glicose_sem_derrame = consulta_sem_derrame.first()[0]

#exibindo resultados
print(f'A média da glicose em pacientes que sofreram derrame foi: {media_glicose_derrame}')
print(f'A média da glicose em pacientes que nao sofreram derrame foi: {media_glicose_sem_derrame}')

A média da glicose em pacientes que sofreram derrame foi: 119.95307046938272
A média da glicose em pacientes que nao sofreram derrame foi: 103.60273130214506


Agora vamos verificar o IMC de pacientes que sofreram e não sofreram derrame

In [0]:
df.createOrReplaceTempView('media_IMC')

consulta_imc_derrame = spark.sql("""
    SELECT AVG(bmi) AS media_imc
    FROM media_IMC
    WHERE stroke =1
    """)
consulta_imc_sem_derrame = spark.sql("""
    SELECT AVG(bmi) AS media_imc_sem_derrame
    FROM media_IMC
    WHERE stroke = 0
    """)

imc_derrame = consulta_imc_derrame.first()[0]
imc_sem_derrame = consulta_imc_sem_derrame.first()[0]

print(f'A média de IMC dos pacientes que sofreram derrame foi de: {imc_derrame:.2f}')
print(f'A média de IMC dos pacientes que NAO sofreram derrame foi de: {imc_sem_derrame:.2f}')

A média de IMC dos pacientes que sofreram derrame foi de: 29.94
A média de IMC dos pacientes que NAO sofreram derrame foi de: 27.99


Modelo de predição

In [0]:
#pre processamento dos dados
categoricalCols = ["gender", "ever_married", "work_type", "Residence_type", "smoking_status"]

#indexacao variaveis categoricas
stringIndexer = StringIndexer(inputCols = categoricalCols, outputCols=[x + "Index" for x in categoricalCols])
#codificacao One Hot
oneHotEncoder = OneHotEncoder(inputCols=stringIndexer.getOutputCols(), outputCols=[x + "OHE" for x in categoricalCols])


In [0]:
#criando um vetor com as colunas numericas e categoricas
#colunas numericas
numericCols = ["age", "hypertension", "heart_disease", "avg_glucose_level", "bmi", "stroke"]

#todas as colunas
allCols = [c + "OHE" for c in categoricalCols] + numericCols

vecAssembler = VectorAssembler(inputCols = allCols, outputCol='features')

In [0]:
#criando o modelo de arvore de decisao
dtc = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

In [0]:
#criando o pipeline
pipeline = Pipeline(stages=[stringIndexer, oneHotEncoder, vecAssembler, dtc])

#separando os dados em treino e teste
train_data, test_data = df.randomSplit([0.7, 0.3])

pipelineModel = pipeline.fit(train_data)

#aplicando o pipeline aos dados de teste
prediction_df = pipelineModel.transform(test_data)

In [0]:
#Testando a precisao do modelo
evaluator = MulticlassClassificationEvaluator(metricName="accuracy", labelCol='stroke')
print(f'Acurácia: {evaluator.evaluate(prediction_df)}')

Acurácia: 1.0
