In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.functions import *
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.types import (StructField, StructType, StringType, IntegerType, DecimalType, FloatType)
from pyspark.ml.classification import RandomForestClassifier, NaiveBayes, DecisionTreeClassifier, MultilayerPerceptronClassifier
spark = SparkSession.builder.appName('TrabajoFinal').getOrCreate()
#["age", "sex", "cp", "trestbps", "chol", "fbs", "restecg", "thalach", "exang", "oldpeak", "slope", "ca", "thal", "num"]

In [2]:
schema = [StructField("age", FloatType(), True),
          StructField("sex", FloatType(), True),
          StructField("cp", FloatType(), True),
          StructField("trestbps", FloatType(), True),
          StructField("chol", FloatType(), True),
          StructField("fbs", FloatType(), True),
          StructField("restecg", FloatType(), True),
          StructField("thalach", FloatType(), True),
          StructField("exang", FloatType(), True),
          StructField("oldpeak", FloatType(), True),
          StructField("slope", FloatType(), True),
          StructField("ca", StringType(), True),
          StructField("thal", StringType(), True),
          StructField("num", FloatType(), True)]
schema_nuevo = StructType(fields=schema)
data = spark.read.csv("processed.cleveland.data", inferSchema=True, header=False, schema=schema_nuevo)
data.show()

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|0.0|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|2.0|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|1.0|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|0.0|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|0.0|
|56.0|1.0|2.0|   120.0|236.0|0.0|    0.0|  178.0|  0.0|    0.8|  1.0|0.0| 3.0|0.0|
|62.0|0.0|4.0|   140.0|268.0|0.0|    2.0|  160.0|  0.0|    3.6|  3.0|2.0| 3.0|3.0|
|57.0|0.0|4.0|   120.0|354.0|0.0|    0.0|  163.0|  1.0|    0.6|  1.0|0.0| 3.0|0.0|
|63.0|1.0|4.0|   130.0|254.0|0.0|    2.0|  147.0|  0.0|    1.4|  2.0|1.0| 7.0|2.0|
|53.

In [3]:
data = data.filter(data.ca != '?')
data = data.filter(data.thal != '?')
data.show()

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|0.0|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|2.0|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|1.0|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|0.0|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|0.0|
|56.0|1.0|2.0|   120.0|236.0|0.0|    0.0|  178.0|  0.0|    0.8|  1.0|0.0| 3.0|0.0|
|62.0|0.0|4.0|   140.0|268.0|0.0|    2.0|  160.0|  0.0|    3.6|  3.0|2.0| 3.0|3.0|
|57.0|0.0|4.0|   120.0|354.0|0.0|    0.0|  163.0|  1.0|    0.6|  1.0|0.0| 3.0|0.0|
|63.0|1.0|4.0|   130.0|254.0|0.0|    2.0|  147.0|  0.0|    1.4|  2.0|1.0| 7.0|2.0|
|53.

In [4]:
data = data.withColumn("ca", data["ca"].cast(FloatType()))
data = data.withColumn("thal", data["thal"].cast(FloatType()))
data.show()
data.printSchema()

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|0.0|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|2.0|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|1.0|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|0.0|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|0.0|
|56.0|1.0|2.0|   120.0|236.0|0.0|    0.0|  178.0|  0.0|    0.8|  1.0|0.0| 3.0|0.0|
|62.0|0.0|4.0|   140.0|268.0|0.0|    2.0|  160.0|  0.0|    3.6|  3.0|2.0| 3.0|3.0|
|57.0|0.0|4.0|   120.0|354.0|0.0|    0.0|  163.0|  1.0|    0.6|  1.0|0.0| 3.0|0.0|
|63.0|1.0|4.0|   130.0|254.0|0.0|    2.0|  147.0|  0.0|    1.4|  2.0|1.0| 7.0|2.0|
|53.

In [5]:
data.columns

['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal',
 'num']

In [6]:
vassembler = VectorAssembler(inputCols=['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal'], outputCol="features")
data2 = vassembler.transform(data)
data2.show()

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+--------------------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|            features|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+--------------------+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|0.0|[63.0,1.0,1.0,145...|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|2.0|[67.0,1.0,4.0,160...|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|1.0|[67.0,1.0,4.0,120...|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|0.0|[37.0,1.0,3.0,130...|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|0.0|[41.0,0.0,2.0,130...|
|56.0|1.0|2.0|   120.0|236.0|0.0|    0.0|  178.0|  0.0|    0.8|  1.0|0.0| 3.0|0.0|[56.0,1.0,2.0,120...|
|62.0|0.0|4.0|   140.0|268.0|0.0|    2.0|  160.0|  0.0|    3.6| 

In [7]:
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)
data3 = scaler.fit(data2).transform(data2)
data3.show()

+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+--------------------+--------------------+
| age|sex| cp|trestbps| chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|num|            features|      scaledFeatures|
+----+---+---+--------+-----+---+-------+-------+-----+-------+-----+---+----+---+--------------------+--------------------+
|63.0|1.0|1.0|   145.0|233.0|1.0|    2.0|  150.0|  0.0|    2.3|  3.0|0.0| 6.0|0.0|[63.0,1.0,1.0,145...|[6.96152928881618...|
|67.0|1.0|4.0|   160.0|286.0|0.0|    2.0|  108.0|  1.0|    1.5|  2.0|3.0| 3.0|2.0|[67.0,1.0,4.0,160...|[7.40353114842356...|
|67.0|1.0|4.0|   120.0|229.0|0.0|    2.0|  129.0|  1.0|    2.6|  2.0|2.0| 7.0|1.0|[67.0,1.0,4.0,120...|[7.40353114842356...|
|37.0|1.0|3.0|   130.0|250.0|0.0|    0.0|  187.0|  0.0|    3.5|  3.0|0.0| 3.0|0.0|[37.0,1.0,3.0,130...|[4.08851720136823...|
|41.0|0.0|2.0|   130.0|204.0|0.0|    2.0|  172.0|  0.0|    1.4|  1.0|0.0| 3.0|0.0|[41.0,0.0,2.0,130...|[4.53051906097561...|


In [8]:
data_final2 = data3.select("scaledFeatures", "num")
data_final = data2.select("features", "num")
train, test = data_final.randomSplit([0.8, 0.2], seed=1)
train2, test2 = data_final2.randomSplit([0.8, 0.2], seed=1)

In [9]:
rf = RandomForestClassifier(labelCol="num", featuresCol="features")
nb = NaiveBayes(labelCol="num", featuresCol="features")
rn = MultilayerPerceptronClassifier(labelCol="num", featuresCol="scaledFeatures", layers=[13, 14, 5])
dt = DecisionTreeClassifier(labelCol="num", featuresCol="features")

In [10]:
rf_grid = ParamGridBuilder()\
    .addGrid(rf.numTrees, [3, 5])\
    .addGrid(rf.impurity, ["gini", "entropy"])\
    .build()
rn_grid = ParamGridBuilder()\
    .addGrid(rn.solver, ["l-bfgs", "gd"])\
    .build()
dt_grid = ParamGridBuilder()\
    .addGrid(dt.impurity, ["gini", "entropy"])\
    .build()

In [11]:
evaluator = MulticlassClassificationEvaluator(labelCol="num", metricName="f1")
rf_cv = CrossValidator(estimator=rf, estimatorParamMaps=rf_grid, evaluator=evaluator)
rf_cvModel = rf_cv.fit(train)
rn_cv = CrossValidator(estimator=rn, estimatorParamMaps=rn_grid, evaluator=evaluator)
rn_cvModel = rn_cv.fit(train2)
dt_cv = CrossValidator(estimator=dt, estimatorParamMaps=dt_grid, evaluator=evaluator)
dt_cvModel = dt_cv.fit(train)

In [12]:
print(f"Mejores parámetros (Random Forest): NumTrees: {rf_cvModel.bestModel.getNumTrees}, Impurity: {rf_cvModel.bestModel.getImpurity()}")
print("Mejores parámetros (Redes Neuronales): Solver: ", rn_cvModel.bestModel.getSolver())
print("Mejores parámetros (Decision Tree): Impurity:", dt_cvModel.bestModel.getImpurity())

Mejores parámetros (Random Forest): NumTrees: 5, Impurity: entropy
Mejores parámetros (Redes Neuronales): Solver:  l-bfgs
Mejores parámetros (Decision Tree): Impurity: gini


In [13]:
rf = RandomForestClassifier(labelCol="num", featuresCol="features", numTrees=5, impurity="entropy")
rf_modelo = rf.fit(train,)
nb = NaiveBayes(labelCol="num", featuresCol="features")
nb_modelo = nb.fit(train,)
rn = MultilayerPerceptronClassifier(labelCol="num", featuresCol="scaledFeatures", layers=[13, 14, 5], solver="l-bfgs")
rn_modelo = rn.fit(train2,)
dt = DecisionTreeClassifier(labelCol="num", featuresCol="features", impurity="entropy")
dt_modelo = dt.fit(train,)

In [14]:
rf_prediccion = rf_modelo.transform(test)
nb_prediccion = nb_modelo.transform(test)
rn_prediccion = rn_modelo.transform(test2)
dt_prediccion = dt_modelo.transform(test)

In [15]:
evaluator = MulticlassClassificationEvaluator(labelCol="num", metricName="f1")
rf_auc = evaluator.evaluate(rf_prediccion)
print("Área debajo de la curva (Random Forest) = %g" % rf_auc)
nb_auc = evaluator.evaluate(nb_prediccion)
print("Área debajo de la curva (Naive Bayes) = %g" % nb_auc)
rn_auc = evaluator.evaluate(rn_prediccion)
print("Área debajo de la curva (Redes Neuronales) = %g" % rn_auc)
dt_auc = evaluator.evaluate(dt_prediccion)
print("Área debajo de la curva (Decision Tree) = %g" % dt_auc)

Área debajo de la curva (Random Forest) = 0.566684
Área debajo de la curva (Naive Bayes) = 0.468344
Área debajo de la curva (Redes Neuronales) = 0.580652
Área debajo de la curva (Decision Tree) = 0.402617
