In [None]:
! pip install pyspark

# Iniciar Sessão Spark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName("Titanic-ML") \
        .getOrCreate()

spark.version

# Carregar o conjunto de dados


In [None]:
df = spark.read.csv('/kaggle/input/stroke-data', header='True', inferSchema='True')

df.printSchema()

### Quantos registros existem no arquivo?

In [None]:
num_records = df.count()
print(f"O Dataset possui {num_records} registros.")

### Quantas colunas existem no arquivo? 

In [None]:
num_columns = len(df.columns)

print(f"O DataFrame possui {num_columns} colunas.")

### Quantas são numéricas? 

In [None]:
from pyspark.sql.types import NumericType

num_numeric_cols = 0

for col in df.columns:
    data_type = df.schema[col].dataType
    if isinstance(data_type, NumericType):
        num_numeric_cols += 1
        
print(f"O DataFrame possui {num_numeric_cols} colunas numéricas.")

### Quantos pacientes sofreram e não sofreram derrame (stroke), respectivamente?

In [None]:
df.createOrReplaceTempView('stroke_table')

stroke_count = spark.sql("SELECT stroke, COUNT(*) as count FROM stroke_table GROUP BY stroke")
stroke_count.show()

### Quantos pacientes tiveram derrame por tipo de trabalho (work_type)?

Quantos pacientes sofreram derrame e trabalhavam respectivamente, no setor privado, de forma independente, no governo e quantas são crianças?

In [None]:
stroke_by_work_type = spark.sql("SELECT work_type, COUNT(*) as count FROM stroke_table WHERE stroke = 1 GROUP BY work_type")
stroke_by_work_type.show()

### Qual a proporção, por gênero, de participantes do estudo. 

A maioria dos participantes é?

In [None]:
gender = spark.sql("SELECT gender, COUNT(*) as count FROM stroke_table GROUP BY gender")
gender.show()

### Quem tem mais probabilidade de sofrer derrame: hipertensos ou não-hipertensos?
 

In [None]:
total = spark.sql("SELECT hypertension, COUNT(*) as total FROM stroke_table GROUP BY hypertension")
total.show()

In [None]:
total_stroke = spark.sql("SELECT hypertension, COUNT(*) as total_stroke FROM stroke_table WHERE stroke = 1 GROUP BY hypertension")
total_stroke.show()

In [None]:
result = total.join(total_stroke, 'hypertension', 'left_outer')

result = result.withColumn("Probs_stroke", result["total_stroke"] / result["total"])

result.show()

### Qual o número de pessoas que sofreram derrame por idade?

In [None]:
stroke_by_age = spark.sql("SELECT age, COUNT(*) as count FROM stroke_table WHERE stroke = 1 GROUP BY age")
stroke_by_age.show()

###  Com qual idade o maior número de pessoas do conjunto de dados sofreu derrame?

In [None]:
# Ordenar os resultados em ordem decrescente pela contagem
stroke_by_age_ordered = stroke_by_age.orderBy("count", ascending=False)

# Selecionar a primeira linha, que terá a idade com a maior contagem
greater_age = stroke_by_age_ordered.first()
greater_age

###  Quantas pessoas sofreram derrames após os 50 anos?

In [None]:
stroke_age_greater_50 = spark.sql("SELECT age FROM stroke_table WHERE stroke = 1 AND age>50").count()
stroke_age_greater_50 

#### Qual o nível médio de glicose para pessoas que, respectivamente, sofreram e não sofreram derrame?

In [None]:
avg_glucose_level = spark.sql("SELECT stroke, AVG(avg_glucose_level) as avg_glucose FROM stroke_table GROUP BY stroke")
avg_glucose_level.show()

### Qual é o BMI (IMC = índice de massa corpórea) médio de quem sofreu e não sofreu derrame?

In [None]:
avg_imc = spark.sql("SELECT stroke, AVG(bmi) as avg_bmi FROM stroke_table GROUP BY stroke")
avg_imc.show()

# Modelo de árvore de decisão para prevê a chance de derrame (stroke) 


In [None]:
train_data, test_data = df.randomSplit([0.7, 0.3])

In [None]:
from pyspark.ml.feature import VectorAssembler

# usar as variáveis contínuas/categóricas binárias: 
# idade, BMI, hipertensão, doença do coração, nível médio de glicose.  
numerical_cols = ['age', 'bmi', 'hypertension', 'heart_disease', 'avg_glucose_level']

assembler = VectorAssembler(inputCols=numerical_cols, outputCol='features')

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

classifier = DecisionTreeClassifier(labelCol='stroke', featuresCol='features')

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[ assembler, classifier])

In [None]:
%time predict_pipeline = pipeline.fit(train_data)

In [None]:
predictions = predict_pipeline.transform(test_data)
predictions.select('0', 'rawPrediction', 'prediction', 'stroke').show(50)

In [None]:
df.show(5)

### Métricas do modelo

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.sql import SparkSession, Row

def evaluator(predictions):
    
    # Define as métricas de avaliação
    evaluator_acc = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='accuracy')
    evaluator_precision = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='weightedPrecision')
    evaluator_recall = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='weightedRecall')
    evaluator_f1 = MulticlassClassificationEvaluator(labelCol='stroke', predictionCol='prediction', metricName='f1')
    evaluator_auc = BinaryClassificationEvaluator(labelCol='stroke', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

    # Calcula as métricas
    accuracy = evaluator_acc.evaluate(predictions)
    precision = evaluator_precision.evaluate(predictions)
    recall = evaluator_recall.evaluate(predictions)
    f1 = evaluator_f1.evaluate(predictions)
    auc = evaluator_auc.evaluate(predictions)

    metrics_data = [
        Row(Metric="Accuracy", Value=round(accuracy,4)),
        Row(Metric="Precision", Value=round(precision,4)),
        Row(Metric="Recall", Value=round(recall,4)),
        Row(Metric="F1 Score", Value=round(f1,4)),
        Row(Metric="AUC", Value=round(auc,4)),
    ]

    # Create a DataFrame from the list of rows
    metrics_df = spark.createDataFrame(metrics_data)
    
    return metrics_df

In [None]:
metrics_df = evaluator(predictions)
# Mostra o DataFrame
metrics_df.show()

### Adicionar ao modelo as variáveis categóricas: gênero e status de fumante

In [None]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

# Define as colunas a serem tratadas 
categorical_cols = ["gender", "smoking_status"]
# Cria os StringIndexers para as colunas categóricas
string_indexers = [StringIndexer(inputCol=col, outputCol=col + '_index') for col in categorical_cols]
# Cria o OneHotEncoder para as colunas indexadas
one_hot_encoders = [OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=indexer.getOutputCol() + '_OHE') for indexer in string_indexers]


In [None]:
# Crie uma lista de todas as colunas codificadas
encoded_cols = [encoder.getOutputCol() for encoder in one_hot_encoders]
all_cols = numerical_cols + encoded_cols
all_cols

In [None]:
assembler = VectorAssembler(inputCols=all_cols, outputCol='features')

In [None]:

# Lista de estágios do pipeline
stages = string_indexers + one_hot_encoders + [assembler, classifier]
# Criar um objeto Pipeline
pipeline = Pipeline(stages=stages)

In [None]:
%time predict_pipeline = pipeline.fit(train_data)

In [None]:
predictions = predict_pipeline.transform(test_data)
predictions.select('0', 'rawPrediction', 'prediction', 'stroke').show(50)

In [None]:
# Mostrar as métricas
metrics_df = evaluator(predictions)
metrics_df.show()

### Qual dessas variáveis é mais importante no modelo de árvore de decisão?

In [None]:
model = predict_pipeline.stages[-1]

list(zip(assembler.getInputCols(), model.featureImportances))

### Qual a profundidade da árvore de decisão? 

In [None]:
model.depth

### Quantos nodos a árvore de decisão possui?

In [None]:
model.numNodes