##Ejercicio 1

In [2]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [3]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488490 sha256=5c5afb08817eeba3ae87811aa616ff7f500716fe6341432215d5d648e86086ae
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [4]:
from pyspark.sql import SparkSession

# Inicializar la sesión de Spark
spark = SparkSession.builder \
    .appName("Heart Data Analysis") \
    .getOrCreate()

In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# Definir el esquema del DataFrame
schema = StructType([
    StructField("age", IntegerType(), True),
    StructField("sex", IntegerType(), True),
    StructField("cp", IntegerType(), True),
    StructField("trestbps", IntegerType(), True),
    StructField("chol", IntegerType(), True),
    StructField("fbs", IntegerType(), True),
    StructField("restecg", IntegerType(), True),
    StructField("thalach", IntegerType(), True),
    StructField("exang", IntegerType(), True),
    StructField("oldpeak", FloatType(), True),
    StructField("slope", IntegerType(), True),
    StructField("ca", IntegerType(), True),
    StructField("thal", IntegerType(), True),
    StructField("target", IntegerType(), True)
])

# Cargar los datos con el esquema definido
data_path = '/content/heart.csv'
heart_df = spark.read.csv(data_path, header=True, schema=schema)

In [6]:
# Mostrar las primeras filas del DataFrame
heart_df.show(5)

# Mostrar el esquema del DataFrame
heart_df.printSchema()

# Describir las estadísticas básicas de las columnas numéricas
heart_df.describe().show()

# Contar la distribución de la columna objetivo ('target')
heart_df.groupBy('target').count().show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 5 rows

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable =

##Ejercisio 2

In [7]:
from pyspark.sql.functions import col, when, count, mean
from pyspark.sql import functions as F

# Verificar los valores nulos en cada columna
heart_df.select([count(when(col(c).isNull(), c)).alias(c) for c in heart_df.columns]).show()

# Rellenar los valores nulos en características numéricas con la mediana
numerical_cols = [c for c, t in heart_df.dtypes if t in ['int', 'double']]
for c in numerical_cols:
    median = heart_df.approxQuantile(c, [0.5], 0.0)[0]
    heart_df = heart_df.fillna({c: median})

# Rellenar los valores nulos en características categóricas con el valor más frecuente
categorical_cols = [c for c, t in heart_df.dtypes if t == 'string']
for c in categorical_cols:
    mode = heart_df.groupBy(c).count().orderBy('count', ascending=False).first()[0]
    heart_df = heart_df.fillna({c: mode})

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|  0|  0|  0|       0|   0|  0|      0|      0|    0|      0|    0|  0|   0|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+



In [8]:
from pyspark.ml.feature import VectorAssembler, StandardScaler

# Ensamblar las características numéricas en un solo vector
assembler = VectorAssembler(inputCols=numerical_cols, outputCol="features_vector")
heart_df = assembler.transform(heart_df)

# Escalar las características
scaler = StandardScaler(inputCol="features_vector", outputCol="scaled_features", withStd=True, withMean=True)
scaler_model = scaler.fit(heart_df)
heart_df = scaler_model.transform(heart_df)

In [9]:
from pyspark.ml.feature import StringIndexer

# Codificar características categóricas
for c in categorical_cols:
    indexer = StringIndexer(inputCol=c, outputCol=c+"_index")
    heart_df = indexer.fit(heart_df).transform(heart_df)

In [10]:
# Seleccionar las columnas necesarias para el modelo
feature_cols = [c+"_index" for c in categorical_cols] + ["scaled_features"]
assembler_final = VectorAssembler(inputCols=feature_cols, outputCol="features")
heart_df = assembler_final.transform(heart_df)

# Seleccionar las columnas finales
heart_df = heart_df.select("features", "target")

# Mostrar el DataFrame final
heart_df.show(5)

+--------------------+------+
|            features|target|
+--------------------+------+
|[0.95062402146783...|     1|
|[-1.9121496945579...|     1|
|[-1.4717229690155...|     1|
|[0.17987725176857...|     1|
|[0.28998393315418...|     1|
+--------------------+------+
only showing top 5 rows



In [11]:
# Dividir los datos en conjuntos de entrenamiento (80%) y prueba (20%)
train_df, test_df = heart_df.randomSplit([0.8, 0.2], seed=42)

In [12]:
from pyspark.ml.classification import LogisticRegression

# Inicializar el modelo de regresión logística
lr = LogisticRegression(labelCol="target", featuresCol="features")

# Entrenar el modelo
lr_model = lr.fit(train_df)

In [13]:
# Realizar predicciones en el conjunto de datos de prueba
predictions = lr_model.transform(test_df)

# Mostrar algunas de las predicciones
predictions.select("features", "target", "prediction", "probability").show(5)

+--------------------+------+----------+--------------------+
|            features|target|prediction|         probability|
+--------------------+------+----------+--------------------+
|[-2.2424697387148...|     1|       1.0|[1.75031533035104...|
|[-2.1323630573292...|     1|       1.0|[2.87577376933252...|
|[-1.9121496945579...|     1|       1.0|[5.46417579478814...|
|[-1.6919363317867...|     1|       1.0|[2.65467912927529...|
|[-1.4717229690155...|     1|       1.0|[1.83349357053014...|
+--------------------+------+----------+--------------------+
only showing top 5 rows



In [14]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Inicializar el evaluador para el área bajo la curva ROC
evaluator = BinaryClassificationEvaluator(labelCol="target", rawPredictionCol="rawPrediction")

# Calcular el área bajo la curva ROC
roc_auc = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

# Mostrar el área bajo la curva ROC
print(f"Área bajo la curva ROC: {roc_auc}")

# Calcular la precisión, el recall y el F1-score
predictions.groupBy("target", "prediction").count().show()

# Definir las métricas de evaluación adicionales
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Precisión
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")
accuracy = accuracy_evaluator.evaluate(predictions)

# Recall
recall_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="weightedRecall")
recall = recall_evaluator.evaluate(predictions)

# F1-score
f1_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="f1")
f1 = f1_evaluator.evaluate(predictions)

# Mostrar las métricas de evaluación
print(f"Precisión: {accuracy}")
print(f"Recall: {recall}")
print(f"F1-score: {f1}")


Área bajo la curva ROC: 1.0
+------+----------+-----+
|target|prediction|count|
+------+----------+-----+
|     0|       0.0|   19|
|     1|       1.0|   28|
+------+----------+-----+

Precisión: 1.0
Recall: 1.0
F1-score: 1.0
