In [0]:
df = spark.read.csv('/FileStore/tables/heart_attack_prediction_dataset.csv', header=True, inferSchema=True)
#display(df)

In [0]:
from pyspark.sql.functions import split
df = df.withColumn("Pressao_Sistolica", split(df["Blood Pressure"], "/")[0].cast("int"))
df = df.withColumn("Pressao_Diastolica", split(df["Blood Pressure"], "/")[1].cast("int"))
display(df)

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
import pyspark.sql.functions as func
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder

feature_columns = [
    'Age', 'Cholesterol', 
    'Heart Rate', 'Diabetes', 'Family History', 'Smoking', 'Obesity', 
    'Alcohol Consumption', 'Exercise Hours Per Week', 
    'Previous Heart Problems', 'Medication Use', 'Stress Level', 
    'Sedentary Hours Per Day', 'Income', 'BMI', 'Triglycerides', 
    'Physical Activity Days Per Week', 'Sleep Hours Per Day', 'Pressao_Sistolica','Pressao_Diastolica'
    
]

# Colunas categóricas a serem convertidas
string_columns = ['Sex','Diet','Country','Continent','Hemisphere']




#feature_columns = ['Physical Activity Days Per Week','Medication Use','Previous Heart Problems',
 #                   'Stress Level','Smoking','Age']

# Colunas categóricas a serem convertidas
#string_columns = ['Sex','Diet', 'Blood Pressure']




# Criando um StringIndexer para cada coluna categórica
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in string_columns]

vec_assembler = VectorAssembler(inputCols=feature_columns + [column + "_index" for column in string_columns], outputCol='features')


# Criando um pipeline para encadear as etapas
pipeline = Pipeline(stages=indexers + [vec_assembler])

df_transform = pipeline.fit(df).transform(df)
#df_transform.display()

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline
import pyspark.sql.functions as func
from pyspark.ml.classification import RandomForestClassifier

feature_columns = [
    'Age', 'Cholesterol', 
    'Heart Rate', 'Diabetes', 'Family History', 'Smoking', 'Obesity', 
    'Alcohol Consumption', 'Exercise Hours Per Week', 
    'Previous Heart Problems', 'Medication Use', 'Stress Level', 
    'Sedentary Hours Per Day', 'Income', 'BMI', 'Triglycerides', 
    'Physical Activity Days Per Week', 'Sleep Hours Per Day', 'Pressao_Sistolica','Pressao_Diastolica'
]

# Colunas categóricas a serem convertidas
string_columns = ['Sex','Diet','Country','Continent','Hemisphere']

# Criando um StringIndexer para cada coluna categórica
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in string_columns]

# Criando um OneHotEncoder para cada coluna categórica
onehot_encoders = [OneHotEncoder(inputCol=column + "_index", outputCol=column + "_encoded") for column in string_columns]

# Criando um VectorAssembler
vec_assembler = VectorAssembler(inputCols=feature_columns + [column + "_encoded" for column in string_columns], outputCol='features')

# Criando um pipeline para encadear as etapas
pipeline = Pipeline(stages=indexers + onehot_encoders + [vec_assembler])

# Aplicando o pipeline aos dados
df_transform = pipeline.fit(df).transform(df)


In [0]:
df_train, df_test = df_transform.randomSplit([.8, .2], seed=1234)

In [0]:
rf = RandomForestClassifier(maxDepth=4, labelCol='Heart Attack Risk', featuresCol='features', maxBins=30)

#ajustando aos dados de treino
rf_model = rf.fit(df_train)

#vendo tamanho da árvorer
tree_structure = rf_model.toDebugString

# Divida a string em linhas e conte as linhas
num_nodes = len(tree_structure.split('\n'))

# Exiba o número de nós
print(f"Número total de nós na árvore: {num_nodes}")

In [0]:
#realizando previsão
df_pred=rf_model.transform(df_test)
df_pred = df_pred.withColumnRenamed('prediction','prediction_RF')

In [0]:
from pyspark.sql.functions import col

# Adicionar coluna com a diferença entre a previsão e o rótulo real
df_comparison = df_pred.withColumn("prediction_diff", col("prediction_RF") - col("Heart Attack Risk"))


# Contar o número de acertos e erros
correct_predictions = df_comparison.filter(col("prediction_diff") == 0).count()
incorrect_predictions = df_comparison.filter(col("prediction_diff") != 0).count()

# Calcular a acurácia
total_predictions = df_comparison.count()
accuracy = correct_predictions / total_predictions

# Exibir os resultados
print(f"Total de Previsões: {total_predictions}")
print(f"Número de Acertos: {correct_predictions}")
print(f"Número de Erros: {incorrect_predictions}")
print(f"Acurácia: {accuracy:.2%}")


In [0]:
df_pred.filter(df_pred['prediction_RF']==0).count()

In [0]:
# count values in name column
#print(df['Heart Attack Risk'].value_counts()[1])
df.filter(df['Heart Attack Risk']==0).count()


In [0]:
df.filter(df['Heart Attack Risk']==1).count()

In [0]:
df_zero = df.filter(col('Heart Attack Risk') == 0)
df_um = df.filter(col('Heart Attack Risk') == 1)
#df_um.display()

In [0]:
amostra_zeros = df_zero.sample(False, 1.0).distinct().limit(3139)
#
#amostra_zeros.display()

In [0]:
df_new = amostra_zeros.union(df_um)
df_new.display()
#juntando_df.display()

In [0]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml import Pipeline
import pyspark.sql.functions as func
from pyspark.ml.classification import RandomForestClassifier


feature_columns = [
    'Age', 'Cholesterol', 
    'Heart Rate', 'Diabetes', 'Family History', 'Smoking', 'Obesity', 
    'Alcohol Consumption', 'Exercise Hours Per Week', 
    'Previous Heart Problems', 'Medication Use', 'Stress Level', 
    'Sedentary Hours Per Day', 'Income', 'BMI', 'Triglycerides', 
    'Physical Activity Days Per Week', 'Sleep Hours Per Day', 'Pressao_Sistolica','Pressao_Diastolica'
    
]

# Colunas categóricas a serem convertidas
string_columns = ['Sex','Diet','Country','Continent','Hemisphere']








#feature_columns = ['Physical Activity Days Per Week','Medication Use','Previous Heart Problems',
 #                   'Stress Level','Smoking','Age']

# Colunas categóricas a serem convertidas
#string_columns = ['Sex','Diet', 'Blood Pressure']



# Criando um StringIndexer para cada coluna categórica
indexers = [StringIndexer(inputCol=column, outputCol=column + "_index", handleInvalid="keep") for column in string_columns]

vec_assembler = VectorAssembler(inputCols=feature_columns + [column + "_index" for column in string_columns], outputCol='features')

# Criando um pipeline para encadear as etapas
pipeline = Pipeline(stages=indexers + [vec_assembler])
df_transform_new = pipeline.fit(df_new).transform(df_new)
#df_transform.display()

In [0]:
#separando os dados de teste dnv
df_train_new, df_test_new = df_transform_new.randomSplit([.8, .2], seed=1234)

In [0]:
rf_new = RandomForestClassifier(maxDepth=2,labelCol='Heart Attack Risk', featuresCol='features', maxBins=32)

#ajustando aos dados de treino
rf_model_new = rf_new.fit(df_train_new)

#vendo tamanho da árvorer
tree_structure_new = rf_model_new.toDebugString

# Divida a string em linhas e conte as linhas
num_nodes_new = len(tree_structure_new.split('\n'))

# Exiba o número de nós
print(f"Número total de nós na árvore: {num_nodes_new}")

In [0]:
#realizando previsão
df_pred_new=rf_model_new.transform(df_test_new)
df_pred_new = df_pred_new.withColumnRenamed('prediction','prediction_RF')

In [0]:
df_transform.display()

In [0]:
from pyspark.sql.functions import col

# Adicionar coluna com a diferença entre a previsão e o rótulo real
df_comparison_new = df_pred_new.withColumn("prediction_diff", col("prediction_RF") - col("Heart Attack Risk"))


# Contar o número de acertos e erros
correct_predictions_new = df_comparison_new.filter(col("prediction_diff") == 0).count()
incorrect_predictions_new = df_comparison_new.filter(col("prediction_diff") != 0).count()

# Calcular a acurácia
total_predictions_new = df_comparison_new.count()
accuracy_new = correct_predictions_new / total_predictions_new

# Exibir os resultados
print(f"Total de Previsões: {total_predictions_new}")
print(f"Número de Acertos: {correct_predictions_new}")
print(f"Número de Erros: {incorrect_predictions_new}")
print(f"Acurácia: {accuracy_new:.2%}")


In [0]:
df_pred_new.filter(df_pred_new['prediction_RF']==1).count()