In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
import os
spark = (SparkSession.builder
         .appName('Stroke Cases')
         .getOrCreate())


In [3]:
#Parametrização e leitura do arquivo de analise com Spark

hist_name = 'stroke_data.csv'

file_name = os.path.join(os.getcwd(), hist_name)

df = spark.read.csv(file_name, header=True, inferSchema=True)

df.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

In [4]:
# Obter o número de colunas
num_cols = len(df.columns)

# Obter o tipo de dados de cada coluna
col_types = [col.dataType.simpleString() for col in df.schema]

# Imprimir o número de colunas e o tipo de dados de cada coluna
print(f"Number of columns: {num_cols}")
print("Column types:")
for col, col_type in zip(df.columns, col_types):
    print(f"{col}: {col_type}")

Number of columns: 12
Column types:
0: int
gender: string
age: double
hypertension: int
heart_disease: int
ever_married: string
work_type: string
Residence_type: string
avg_glucose_level: double
bmi: double
smoking_status: string
stroke: int


Analise exploratória dos dados

In [5]:
#Quantidade de pacientes que sofreram e não sofreram derrame (stroke), respectivamente.
df.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|40287|
|     0|26848|
+------+-----+



In [6]:
#Analise com uma tabela para identificar os parcientes que tiveram derrame por tipo de trabalho.
df.createOrReplaceTempView('table')
stroke_type = df.filter(df['stroke']== 1)
by_work_type = stroke_type.groupBy('work_type').agg({'stroke': 'count'}).distinct()
sort_work_type = by_work_type.orderBy('count(stroke)', ascending=False)
sort_work_type.show()

+-------------+-------------+
|    work_type|count(stroke)|
+-------------+-------------+
|      Private|        23711|
|Self-employed|        10807|
|     Govt_job|         5164|
|     children|          520|
| Never_worked|           85|
+-------------+-------------+



In [7]:
#Proporção do genero da população coletada

gender_query = df.groupBy('gender').count()
total_type = df.count()
proportion_bygroup = gender_query.withColumn('Proportion',(gender_query['count']/total_type)*100)
proportion_bygroup.show()

+------+-----+--------------------+
|gender|count|          Proportion|
+------+-----+--------------------+
|Female|39530|  58.881358456840694|
| Other|   11|0.016384896104863336|
|  Male|27594|   41.10225664705444|
+------+-----+--------------------+



In [8]:
#Analise de derrame por população(Hipertenso e não hipertenso)

strokes=df.filter(df['stroke']==1)

#Grupo 1
hypertension_df1 = strokes.filter(df['hypertension']==1)
group_stroke1 = hypertension_df1.groupBy('stroke').count()
print('Grupo Hypertenso:')
group_stroke1.show()

#Grupo 2
print('Grupo Não-Hypertenso:')
hypertension_df2 = strokes.filter(df['hypertension']==0)
group_stroke2 = hypertension_df2.groupBy('stroke').count()  
group_stroke2.show()


Grupo Hypertenso:
+------+-----+
|stroke|count|
+------+-----+
|     1| 8817|
+------+-----+

Grupo Não-Hypertenso:
+------+-----+
|stroke|count|
+------+-----+
|     1|31470|
+------+-----+



In [9]:
#Derrames por idade
from pyspark.sql.functions import col

age_group = strokes.groupBy('age').count()
age_group.orderBy(col('count').desc()).show()

+----+-----+
| age|count|
+----+-----+
|79.0| 2916|
|78.0| 2279|
|80.0| 1858|
|81.0| 1738|
|82.0| 1427|
|77.0|  994|
|74.0|  987|
|63.0|  942|
|76.0|  892|
|70.0|  881|
|66.0|  848|
|75.0|  809|
|67.0|  801|
|57.0|  775|
|73.0|  759|
|65.0|  716|
|72.0|  709|
|68.0|  688|
|69.0|  677|
|71.0|  667|
+----+-----+
only showing top 20 rows



In [10]:
#Derrames após os 50 anos.

postfiftyes = strokes.filter(df['age'] > 50)
postfiftyes.groupBy('stroke').count().show()

+------+-----+
|stroke|count|
+------+-----+
|     1|28938|
+------+-----+



In [11]:
#Nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.avg_glucose_level

glucose_level = df.groupBy('stroke').agg({'avg_glucose_level': 'avg'}) 
glucose_level.show()

+------+----------------------+
|stroke|avg(avg_glucose_level)|
+------+----------------------+
|     1|    119.95307046938272|
|     0|    103.60273130214506|
+------+----------------------+



In [12]:
#BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame.

bmi = df.groupBy('stroke').agg({'bmi': 'avg'}) 

bmi.show()

+------+------------------+
|stroke|          avg(bmi)|
+------+------------------+
|     1|29.942490629729495|
|     0|27.989678933253657|
+------+------------------+



In [13]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#Colunas selecionadas do DF para utilização do asembler
pipeline_columns_df = ['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level']

# Criar um assembler para combinar as variáveis em um vetor de recursos
assembler = VectorAssembler(inputCols= pipeline_columns_df, outputCol="features")

# Criar um modelo de árvore de decisão
dt = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")

# Montar o pipeline
pipeline = Pipeline(stages=[assembler, dt])

# Dividir os dados em conjuntos de treinamento e teste
train_data, test_data = df.randomSplit([0.8, 0.2])

# Treinar o modelo
model = pipeline.fit(train_data)

# Fazer previsões nos dados de teste
predictions = model.transform(test_data)

# Avaliar o modelo usando a métrica de acurácia
evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)

# Imprimir a acurácia
print("Acurácia:", accuracy)

Acurácia: 0.6888000592197794


In [14]:
#Adição de duas novas variaveis para checar a melhora do algoritimo.

# Indexar as colunas em formato string para numérico
indexer1 = StringIndexer(inputCol="gender", outputCol="indexed_gender")
indexer2 = StringIndexer(inputCol="smoking_status", outputCol="indexed_smoking_status")

#Colunas selecionadas do DF para utilização do asembler
pipeline_columns_df = ['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level', 'indexed_smoking_status', 'indexed_gender']

# Criar um assembler para combinar as variáveis em um vetor de recursos
assembler = VectorAssembler(inputCols= pipeline_columns_df, outputCol="features")

# Criar um modelo de árvore de decisão
dt = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")

# Montar o pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, assembler, dt])

# Dividir os dados em conjuntos de treinamento e teste
train_data, test_data = df.randomSplit([0.8, 0.2])

# Treinar o modelo
model = pipeline.fit(train_data)

# Fazer previsões nos dados de teste
predictions = model.transform(test_data)

# Avaliar o modelo usando a métrica de acurácia
evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Imprimir a acurácia
print("Acurácia:", accuracy)


Acurácia: 0.8399408284023668


In [15]:
# Verificação da importancia das variáveis da arvore de Decisão

#Neste código, model.stages[-1] acessa a última etapa do pipeline, que é o modelo de árvore de decisão, e featureImportances retorna as importâncias das variáveis.

feature_importances = model.stages[-1].featureImportances
for i, importance in enumerate(feature_importances):
    print(f"Importância da variável {pipeline_columns_df[i]}: {importance}")


Importância da variável age: 0.13122909281688455
Importância da variável bmi: 0.0
Importância da variável hypertension: 0.0
Importância da variável heart_disease: 0.0
Importância da variável avg_glucose_level: 0.0075241927536957385
Importância da variável indexed_smoking_status: 0.8612467144294197
Importância da variável indexed_gender: 0.0


In [16]:
# Acessar a quantidade de nós na árvore de decisão
num_nodes = model.stages[-1].numNodes #Neste código, model.stages[-1] acessa a última etapa do pipeline, que é o modelo de árvore de decisão

# Imprimir a quantidade de nós
print("Quantidade de nós na árvore de decisão:", num_nodes)


Quantidade de nós na árvore de decisão: 11


In [17]:

# Obter a árvore de decisão do modelo
tree_model = model.stages[-1]

# Exportar a árvore de decisão para um formato que pode ser visualizado
tree_dot = tree_model.toDebugString

# Salvar a estrutura da árvore em um arquivo de texto
with open('tree_structure.txt', 'w') as file:
    file.write(tree_dot)