In [None]:
import sys
sys.executable

import findspark
findspark.init()

import pandas as pd
import pyspark

import pandas as pd
import numpy as np
#import pyspark.pandas as ps
from pyspark.sql import SparkSession
import matplotlib as plt
import seaborn as sns
from pyspark.sql.functions import col, sum



from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

In [None]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('HeartDiseasePrediction')\
        .getOrCreate()

In [None]:
# ## 2. Carga de Datos
# Cargamos el archivo CSV con los datos del vino y revisamos las primeras filas.

# Cargar el archivo CSV

gt = spark.read.csv('data/heart_disease_uci.csv', 
                       inferSchema = True,
                       header = True)

In [None]:
print(type(gt))

In [None]:
df = gt.toPandas()

In [None]:
df.head()

In [None]:
spark_df = spark.createDataFrame(df)

In [None]:
# 3. Preprocesamiento: Imputar valores nulos
# Calcular las medias para las columnas con valores nulos y rellenarlos
mean_trestbps = spark_df.select("trestbps").agg({"trestbps": "mean"}).first()[0]
mean_chol = spark_df.select("chol").agg({"chol": "mean"}).first()[0]
mean_thalch = spark_df.select("thalch").agg({"thalch": "mean"}).first()[0]
mean_oldpeak = spark_df.select("oldpeak").agg({"oldpeak": "mean"}).first()[0]

In [None]:
# Aplicar imputación
spark_df = spark_df.na.fill({
    "trestbps": mean_trestbps,
    "chol": mean_chol,
    "thalch": mean_thalch,
    "oldpeak": mean_oldpeak
})

In [None]:
# 4. Convertir columnas booleanas a tipo string
# Aquí hacemos la conversión de "exang" y cualquier otra columna booleana
spark_df = spark_df.withColumn("exang", col("exang").cast("string"))

In [None]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler


# 5. Indexar columnas categóricas
categorical_cols = ["sex", "cp", "restecg", "exang", "slope", "thal"]
indexers = [StringIndexer(inputCol=col, outputCol=col + "_index").fit(spark_df) for col in categorical_cols]


In [None]:
# Aplicar los indexers al DataFrame
for indexer in indexers:
    spark_df = indexer.transform(spark_df)

In [None]:
# 6. Continuar con el ensamblado y procesamiento de características como estaba planeado
# Ensamblar características
feature_cols = ["age", "trestbps", "chol", "thalch", "oldpeak"] + [col + "_index" for col in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
spark_df = assembler.transform(spark_df)

In [None]:
# Verificar el tipo de la columna "features"
print("Tipo de 'features' después de ensamblar:", spark_df.schema["features"].dataType)

In [None]:
# Verificar si hay valores nulos en las columnas de características
spark_df.select([col for col in feature_cols if spark_df.filter(spark_df[col].isNull()).count() > 0]).show()


In [None]:
# Eliminar filas con valores nulos en las columnas de características
spark_df = spark_df.na.drop(subset=feature_cols)


In [None]:
# Mostrar algunos valores de la columna "features" para ver su contenido
spark_df.select("features").show(5, truncate=False)


In [None]:
# Mostrar estadísticas descriptivas para las columnas de características
spark_df.select(feature_cols).describe().show()


In [None]:
# Escalar las características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
spark_df = scaler.fit(spark_df).transform(spark_df)
