## DESM2 - Desafio do Módulo 2 - Pós em Ciência de Dados

### Desafio Prático 
### Módulo 2: Desenvolvimento de Soluções Utilizando Spark 
### Objetivos de Ensino 
Exercitar os seguintes conceitos trabalhados no Módulo: 
✓ Exercitar o módulo Spark SQL do Apache Spark; 
✓ Exercitar o módulo Spark MLLib do Apache Spark. 

### Enunciado 
Doenças ligadas ao coração afetam milhões de pessoas ao redor do mundo e, segundo a Organização Mundial da Saúde (OMS), são a segunda principal causa de morte da população mundial. Como cientista de dados, você foi contratado para criar um modelo preditivo que, a partir de dados de pacientes como idade, gênero, nível de glicose, se o paciente fuma ou não, vai prever se aquele paciente terá um derrame cerebral ou não. 
Você tem acesso a um arquivo que possui atributos de pacientes e um atributo “stroke” (derrame), que indica se aquele paciente sofreu um evento de derrame ou não. 

O conjunto de dados stroke_data.csv está disponível em:  
• https://drive.google.com/file/d/163BGU_RzXR29UbVVkPpv8tYnUejlPBVr/view?usp=share_link.  

Para uma descrição das colunas, veja a seção “Attribute information” em: 

• https://www.kaggle.com/fedesoriano/stroke-prediction-dataset.

As questões objetivas vão guiá-lo para a análise exploratória e para o modelo 
preditivo que você criará a partir dos dados. 

Links úteis:

● https://spark.apache.org/docs/latest/sql-getting-started.html 

● https://spark.apache.org/docs/latest/ml-classificationregression.html#decision-tree-classifier 

### Atividades 
Os alunos deverão desempenhar as seguintes atividades: 
1. Assistir as aulas gravadas sobre os módulos Spark SQL e Spark ML; 
2. Assistir a aula interativa sobre Spark ML; 
3. A seguir, você estará apto a responder às perguntas. :-)

## Informação dos Atributos

### Attribute Information
1) id: unique identifier
2) gender: "Male", "Female" or "Other"
3) age: age of the patient
4) hypertension: 0 if the patient doesn't have hypertension, 1 if the patient has hypertension
5) heart_disease: 0 if the patient doesn't have any heart diseases, 1 if the patient has a heart disease
6) ever_married: "No" or "Yes"
7) work_type: "children", "Govt_jov", "Never_worked", "Private" or "Self-employed"
8) Residence_type: "Rural" or "Urban"
9) avg_glucose_level: average glucose level in blood
10) bmi: body mass index
11) smoking_status: "formerly smoked", "never smoked", "smokes" or "Unknown"*
12) stroke: 1 if the patient had a stroke or 0 if not

*Note: "Unknown" in smoking_status means that the information is unavailable for this patient

## Importando o Pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, stddev, sum, abs, avg, length

# Configuração básica do Spark
spark = SparkSession.builder \
    .appName("Desafio") \
    .getOrCreate()

## Importando Dados do Arquivo

O conjunto de dados stroke_data.csv está disponível em:

• https://drive.google.com/file/d/163BGU_RzXR29UbVVkPpv8tYnUejlPBVr/view?usp=share_link.

Para uma descrição das colunas, veja a seção “Attribute information” em:

• https://www.kaggle.com/fedesoriano/stroke-prediction-dataset.

In [3]:
# importando o arquivo stroke_data.csv
df = (spark.read
  .format("csv")
  .option("header", "true")
  .option("inferSchema", "true")
  .load("stroke_data.csv")
)

## Visualizando os dados

In [4]:
# visualizando dataframe
df.show()

+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  0|gender| age|hypertension|heart_disease|ever_married|    work_type|Residence_type|avg_glucose_level|  bmi| smoking_status|stroke|
+---+------+----+------------+-------------+------------+-------------+--------------+-----------------+-----+---------------+------+
|  1|Female|18.0|           0|            0|          No|      Private|         Urban|            94.19|12.12|         smokes|     1|
|  2|  Male|58.0|           1|            0|         Yes|      Private|         Rural|           154.24| 33.7|   never_smoked|     0|
|  3|Female|36.0|           0|            0|         Yes|     Govt_job|         Urban|            72.63| 24.7|         smokes|     0|
|  4|Female|62.0|           0|            0|         Yes|Self-employed|         Rural|            85.52| 31.2|formerly smoked|     0|
|  5|Female|82.0|           0|            0|         Yes|     

## Estatística descritiva

In [5]:
# visualizando as estatísticas descritivas do conjunto de dados
df.describe().show()

+-------+------------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|summary|                 0|gender|               age|       hypertension|      heart_disease|ever_married|work_type|Residence_type| avg_glucose_level|               bmi| smoking_status|             stroke|
+-------+------------------+------+------------------+-------------------+-------------------+------------+---------+--------------+------------------+------------------+---------------+-------------------+
|  count|             67135| 67135|             67135|              67135|              67135|       67135|    67135|         67135|             67135|             67135|          67135|              67135|
|   mean|           33568.0|  NULL| 51.95950845311693|0.16410218217025396|0.10142250688910405|        NULL|     NULL|          NULL|113.41439606762462| 29.16154047813857|  

## Pergunta 1

### Quantos registros existem no arquivo?

In [6]:
numero_registros = df.count()
numero_registros

67135

### Resposta: 67135

## Pergunta 2

### Quantas colunas existem no arquivo? Quantas são numéricas? Ao ler o arquivo com spark.read.csv, habilite inferSchema=True. Use a função printSchema() da API de Dataframes.

In [7]:
# Determinar o número total de colunas
num_total_colunas = len(df.columns)
print(f"Número total de colunas: {num_total_colunas}")

# Determinar o número de colunas numéricas
num_colunas_numericas = len([coluna for coluna, tipo in df.dtypes if tipo.startswith('int') or tipo.startswith('double')])
print(f"Número de colunas numéricas: {num_colunas_numericas}")

Número total de colunas: 12
Número de colunas numéricas: 7


In [8]:
df.printSchema()

root
 |-- 0: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: double (nullable = true)
 |-- hypertension: integer (nullable = true)
 |-- heart_disease: integer (nullable = true)
 |-- ever_married: string (nullable = true)
 |-- work_type: string (nullable = true)
 |-- Residence_type: string (nullable = true)
 |-- avg_glucose_level: double (nullable = true)
 |-- bmi: double (nullable = true)
 |-- smoking_status: string (nullable = true)
 |-- stroke: integer (nullable = true)



### Resposta: 12 e 7 respecivamente

## Pergunta 3

### No conjunto de dados, quantos pacientes sofreram e não sofreram derrame (stroke), respectivamente?

In [9]:
#Contando quantos sofreram derrame
sofreram_derrame = df.filter(col("stroke") == 1).count()
print(f"Sofreram Derrame: {sofreram_derrame}")

#Contando quantos não sofreram derrame
nao_sofreram_derrame = df.filter(col("stroke") == 0).count()
print(f"Não Sofreram Derrame: {nao_sofreram_derrame}")

Sofreram Derrame: 40287
Não Sofreram Derrame: 26848


### Resposta: Sofreram Derrame: 40287 e Não Sofreram Derrame: 26848

## Pergunta 4

### A partir do dataframe, crie uma tabela temporária usando df.createOrReplaceTempView('table') e a seguir use spark.sql para escrever uma consulta SQL que obtenha quantos pacientes tiveram derrame por tipo de trabalho (work_type). Quantos pacientes sofreram derrame e trabalhavam respectivamente, no setor privado, de forma independente, no governo e quantas são crianças?

In [12]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para contar quantos pacientes tiveram derrame por tipo de trabalho
consulta_sql = """
    SELECT work_type, COUNT(*) as total_derrame
    FROM pacientes
    WHERE stroke = 1
    GROUP BY work_type
"""

# Executar a consulta SQL usando spark.sql
resultado = spark.sql(consulta_sql)

resultado.show()

+-------------+-------------+
|    work_type|total_derrame|
+-------------+-------------+
| Never_worked|           85|
|Self-employed|        10807|
|      Private|        23711|
|     children|          520|
|     Govt_job|         5164|
+-------------+-------------+



### Resposta:
Privado: 23711

Self-employed: 10807

Govt_job: 5164

children: 520

## Pergunta 5

### Escreva uma consulta com spark.sql para determinar a proporção, por gênero, de participantes do estudo. A maioria dos participantes é:

In [15]:
# Criar uma visualização temporária
df.createOrReplaceTempView('participantes')

# Escrever uma consulta SQL para calcular a proporção por gênero
consulta_sql = """
    SELECT gender, COUNT(*) as total_participantes, 
           ROUND(COUNT(*) / (SELECT COUNT(*) FROM participantes), 2) as proporcao
    FROM participantes
    GROUP BY gender
"""

# Executar a consulta SQL usando spark.sql
resultado = spark.sql(consulta_sql)

resultado.show()

+------+-------------------+---------+
|gender|total_participantes|proporcao|
+------+-------------------+---------+
|Female|              39530|     0.59|
| Other|                 11|      0.0|
|  Male|              27594|     0.41|
+------+-------------------+---------+



### Resposta: Feminina

## Pergunta 6

### Escreva uma consulta com spark.sql para determinar quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos. Você pode escrever uma consulta para cada grupo. A partir das probabilidades que você obteve, você conclui que:

In [16]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para calcular a probabilidade de sofrer derrame entre hipertensos
consulta_hipertensos = """
    SELECT CASE WHEN hypertension = 1 THEN 'Hipertensos' ELSE 'Não-Hipertensos' END AS grupo,
           COUNT(*) as total_pessoas,
           SUM(CASE WHEN stroke = 1 THEN 1 ELSE 0 END) as total_derrame,
           ROUND(SUM(CASE WHEN stroke = 1 THEN 1 ELSE 0 END) / COUNT(*), 4) as probabilidade
    FROM pacientes
    GROUP BY hypertension
"""

# Executar a consulta SQL para hipertensos usando spark.sql
resultado_hipertensos = spark.sql(consulta_hipertensos)

resultado_hipertensos.show()

+---------------+-------------+-------------+-------------+
|          grupo|total_pessoas|total_derrame|probabilidade|
+---------------+-------------+-------------+-------------+
|    Hipertensos|        11017|         8817|       0.8003|
|Não-Hipertensos|        56118|        31470|       0.5608|
+---------------+-------------+-------------+-------------+



### Resposta: A hipertensão, neste conjunto de dados, aumenta a probabilidade de derrame.

## Pergunta 7

### Escreva uma consulta com spark.sql que determine o número de pessoas que sofreram derrame por idade. Com qual idade o maior número de pessoas do conjunto de dados sofreu derrame?

In [17]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para determinar o número de pessoas que sofreram derrame por idade
consulta_derrame_por_idade = """
    SELECT age, COUNT(*) as total_pessoas_com_derrame
    FROM pacientes
    WHERE stroke = 1
    GROUP BY age
    ORDER BY total_pessoas_com_derrame DESC
"""

# Executar a consulta SQL usando spark.sql
resultado_derrame_por_idade = spark.sql(consulta_derrame_por_idade)

# Encontrar a idade com o maior número de pessoas que sofreram derrame
idade_maior_numero_derrame = resultado_derrame_por_idade.first()["age"]

# Imprimir o resultado
print("Número de pessoas que sofreram derrame por idade:")
resultado_derrame_por_idade.show()

print(f"\nA idade com o maior número de pessoas que sofreram derrame é: {idade_maior_numero_derrame}")

Número de pessoas que sofreram derrame por idade:
+----+-------------------------+
| age|total_pessoas_com_derrame|
+----+-------------------------+
|79.0|                     2916|
|78.0|                     2279|
|80.0|                     1858|
|81.0|                     1738|
|82.0|                     1427|
|77.0|                      994|
|74.0|                      987|
|63.0|                      942|
|76.0|                      892|
|70.0|                      881|
|66.0|                      848|
|75.0|                      809|
|67.0|                      801|
|57.0|                      775|
|73.0|                      759|
|65.0|                      716|
|72.0|                      709|
|68.0|                      688|
|69.0|                      677|
|71.0|                      667|
+----+-------------------------+
only showing top 20 rows


A idade com o maior número de pessoas que sofreram derrame é: 79.0


### Resposta: 79 anos

## Pergunta 8

### Usando a API de dataframes, determine quantas pessoas sofreram derrames após os 50 anos.

In [21]:
# Filtrar o DataFrame para pessoas que sofreram derrame após os 50 anos
derrame_apos_50 = df.filter((col("age") > 50) & (col("stroke") == 1))

# Contar o número de pessoas que sofreram derrame após os 50 anos
total_pessoas_derrame_apos_50 = derrame_apos_50.count()

total_pessoas_derrame_apos_50

28938

### Resposta: 28938 pessoas sofreram derrames após os 50 anos

## Pergunta 9

### Usando spark.sql, determine qual o nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame.

In [27]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para calcular o nível médio de glicose para pessoas que sofreram derrame
consulta_glicose_derrame = """
    SELECT AVG(avg_glucose_level) AS nivel_medio_glicose_derrame
    FROM pacientes
    WHERE stroke = 1
"""

# Executar a consulta SQL para pessoas que sofreram derrame usando spark.sql
resultado_glicose_derrame = spark.sql(consulta_glicose_derrame)

# Imprimir o resultado
print("Nível médio de glicose para pessoas que sofreram derrame:")
resultado_glicose_derrame.show()



Nível médio de glicose para pessoas que sofreram derrame:
+---------------------------+
|nivel_medio_glicose_derrame|
+---------------------------+
|         119.95307046938272|
+---------------------------+



In [28]:
# Escrever uma consulta SQL para calcular o nível médio de glicose para pessoas que não sofreram derrame
consulta_glicose_sem_derrame = """
    SELECT AVG(avg_glucose_level) AS nivel_medio_glicose_sem_derrame
    FROM pacientes
    WHERE stroke = 0
"""

# Executar a consulta SQL para pessoas que não sofreram derrame usando spark.sql
resultado_glicose_sem_derrame = spark.sql(consulta_glicose_sem_derrame)

print("Nível médio de glicose para pessoas que não sofreram derrame:")
resultado_glicose_sem_derrame.show()


Nível médio de glicose para pessoas que não sofreram derrame:
+-------------------------------+
|nivel_medio_glicose_sem_derrame|
+-------------------------------+
|             103.60273130214506|
+-------------------------------+



### Resposta: Respectivamente, 119 e 103.

## Pergunta 10

### Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?

In [29]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para calcular o IMC médio para pessoas que sofreram derrame
consulta_imc_derrame = """
    SELECT AVG(bmi) AS imc_medio
    FROM pacientes
    WHERE stroke = 1
"""

# Executar a consulta SQL para pessoas que sofreram derrame usando spark.sql
resultado_imc_derrame = spark.sql(consulta_imc_derrame)

# Imprimir o resultado
print("O IMC médio para pessoas que sofreram derrame:")
resultado_imc_derrame.show()

O IMC médio para pessoas que sofreram derrame:
+------------------+
|         imc_medio|
+------------------+
|29.942490629729495|
+------------------+



In [30]:
# Criar uma visualização temporária
df.createOrReplaceTempView('pacientes')

# Escrever uma consulta SQL para calcular o IMC médio para pessoas que não sofreram derrame
consulta_imc_sem_derrame = """
    SELECT AVG(bmi) AS imc_medio
    FROM pacientes
    WHERE stroke = 0
"""

# Executar a consulta SQL para pessoas que sofreram derrame usando spark.sql
resultado_imc_sem_derrame = spark.sql(consulta_imc_sem_derrame)

# Imprimir o resultado
print("O IMC médio para pessoas que não sofreram derrame:")
resultado_imc_sem_derrame.show()

O IMC médio para pessoas que não sofreram derrame:
+------------------+
|         imc_medio|
+------------------+
|27.989678933253657|
+------------------+



### Resposta: Respectivamente 29,94 e 27,99

## Pergunta 11

### Crie um modelo de árvore de decisão que prevê a chance de derrame (stroke) a partir das variáveis contínuas/categóricas: idade, BMI, hipertensão, doença do coração, nível médio de glicose. Use o conteúdo da segunda aula interativa para criar e avaliar o modelo.

### Qual a acurácia de um modelo construído?

#### Importando bibliotecas de ML

In [31]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Tratando as variáveis

In [32]:
# Tratamento de variáveis categóricas
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in ["hypertension", "heart_disease"]]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)


#### Montando as features

In [33]:
# Montar os features
assembler = VectorAssembler(
    inputCols=["age", "bmi", "avg_glucose_level", "hypertension", "heart_disease"],
    outputCol="features")


#### Preparar Dados para Treinamento e teste

In [34]:
# Preparar os dados para treinamento e teste
data = assembler.transform(df_indexed)
(trainingData, testData) = data.randomSplit([0.7, 0.3])


#### Criando modelo de árvore de decisão

In [35]:
# Criar o modelo de árvore de decisão
dt = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")

#### Treinando o modelo

In [36]:
# Treinar o modelo
model = dt.fit(trainingData)

#### Predição com dados de teste

In [37]:
# Prever com os dados de teste
predictions = model.transform(testData)

#### Avaliando o modelo

In [38]:
# Avaliar o modelo
evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)


#### Visualizando a acurácia do modelo

In [39]:
print("Acurácia do modelo:", accuracy)

Acurácia do modelo: 0.6927602712405265


### Resposta: Menor que 75%

## Pergunta 12

### Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas.  A acurácia (qualidade) do modelo aumentou para:

In [40]:
# Tratamento de variáveis categóricas
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index").fit(df) for col in ["gender", "ever_married", "work_type", "Residence_type", "smoking_status"]]
pipeline = Pipeline(stages=indexers)
df_indexed = pipeline.fit(df).transform(df)

# Montar os features
assembler = VectorAssembler(
    inputCols=["age", "hypertension", "heart_disease", "avg_glucose_level", "bmi", "gender_index", "ever_married_index", "work_type_index", "Residence_type_index", "smoking_status_index"],
    outputCol="features")

# Preparar os dados para treinamento e teste
data = assembler.transform(df_indexed)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Criar o modelo de árvore de decisão
dt = DecisionTreeClassifier(labelCol="stroke", featuresCol="features")

# Treinar o modelo
model = dt.fit(trainingData)

# Prever com os dados de teste
predictions = model.transform(testData)

# Avaliar o modelo
evaluator = MulticlassClassificationEvaluator(labelCol="stroke", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

# Imprimir a acurácia do modelo
print("Acurácia do modelo após adicionar variáveis categóricas:", accuracy)


Acurácia do modelo após adicionar variáveis categóricas: 0.8344400619102301


### Resposta: Acima de 80%

## Pergunta 13

### Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. Qual dessas variáveis é mais importante no modelo de árvore de decisão que você construiu?

In [43]:
# Visualizar a importância das variáveis
importances = model.featureImportances
feature_names = ["age", "hypertension", "heart_disease", "avg_glucose_level", "bmi", "gender", "ever_married", "work_type", "Residence_type", "smoking_status"]
for i in range(len(feature_names)):
    print(f"{feature_names[i]}: {importances[i]}")

age: 0.1659015889285094
hypertension: 0.0
heart_disease: 0.0
avg_glucose_level: 0.007765815589382441
bmi: 0.0009309743584070536
gender: 0.0
ever_married: 0.0
work_type: 0.0
Residence_type: 0.0
smoking_status: 0.8254016211237011


### Resposta: Status sobre fumo.

## Pergunta 14

### Adicione ao modelo as variáveis categóricas: gênero e status de fumante. Use o conteúdo da aula interativa para lidar com as variáveis categóricas. Qual a profundidade da árvore de decisão? 

In [44]:
# Obter a profundidade da árvore de decisão
profundidade_arvore = model.depth

# Imprimir a profundidade da árvore de decisão
print("Profundidade da árvore de decisão:", profundidade_arvore)

Profundidade da árvore de decisão: 5


### Resposta: entre 2 e 7

## Pergunta 15

### Quantos nodos a árvore de decisão possui?

In [45]:
num_nodos_arvore = model.numNodes

# Imprimir o número de nós na árvore de decisão
print("Número de nós na árvore de decisão:", num_nodos_arvore)

Número de nós na árvore de decisão: 17


### Resposta: 17 nodos

In [46]:
# Encerrar a sessão Spark
spark.stop()