#Integrantes:
#1. Yesenia Perez Brito
#2. Yessika Aguilar Fonseca

DataSet asignado: Breast Cancer Data Set

# 0. Imports y sesión Spark

In [19]:
!pip install pyspark



In [20]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline, PipelineModel

# 1. Crear sesión Spark y cargar datos

In [21]:
spark = SparkSession.builder.appName("BreastCancerProject").getOrCreate()
df = spark.read.csv("breast-cancer.data", header=False, inferSchema=True)


**Renombrar columnas según descripción dataset**

In [22]:
cols = ["Class", "age", "menopause", "tumor_size", "inv_nodes", "node_caps",
        "deg_malig", "breast", "breast_quad", "irradiat"]
df = df.toDF(*cols)

# a) Imprimir esquema y primeras 10 filas
df.printSchema()
df.show(10, truncate=False)

# b) Mostrar valores posibles etiqueta (Class)
df.select("Class").distinct().show()

root
 |-- Class: string (nullable = true)
 |-- age: string (nullable = true)
 |-- menopause: string (nullable = true)
 |-- tumor_size: string (nullable = true)
 |-- inv_nodes: string (nullable = true)
 |-- node_caps: string (nullable = true)
 |-- deg_malig: integer (nullable = true)
 |-- breast: string (nullable = true)
 |-- breast_quad: string (nullable = true)
 |-- irradiat: string (nullable = true)

+--------------------+-----+---------+----------+---------+---------+---------+------+-----------+--------+
|Class               |age  |menopause|tumor_size|inv_nodes|node_caps|deg_malig|breast|breast_quad|irradiat|
+--------------------+-----+---------+----------+---------+---------+---------+------+-----------+--------+
|no-recurrence-events|30-39|premeno  |30-34     |0-2      |no       |3        |left  |left_low   |no      |
|no-recurrence-events|40-49|premeno  |20-24     |0-2      |no       |2        |right |right_up   |no      |
|no-recurrence-events|40-49|premeno  |20-24     |0-2  

# 2. Limpieza y preprocesamiento
Reemplazar '?' por None y eliminar filas con nulos

In [23]:
for c in df.columns:
    df = df.withColumn(c, col(c).cast("string"))
df = df.replace("?", None)
df_clean = df.dropna()

**Cast deg_malig a double (numérico)**

In [25]:
df_clean = df_clean.withColumn("deg_malig_num", col("deg_malig").cast("double"))

**Variables categóricas para indexar y codificar (menos etiqueta Class)**

In [26]:
cat_cols = ["age", "menopause", "tumor_size", "inv_nodes", "node_caps", "breast", "breast_quad", "irradiat"]
indexers = [StringIndexer(inputCol=c, outputCol=c + "_idx", handleInvalid="skip") for c in cat_cols]
encoder = OneHotEncoder(inputCols=[c + "_idx" for c in cat_cols], outputCols=[c + "_oh" for c in cat_cols])

**Indexar la etiqueta Class**

In [27]:
label_indexer = StringIndexer(inputCol="Class", outputCol="label", handleInvalid="skip")

**VectorAssembler para formar vector de características**

In [28]:
assembler = VectorAssembler(
    inputCols=[c + "_oh" for c in cat_cols] + ["deg_malig_num"],
    outputCol="features"
)

# Pipeline completo de preprocesamiento
preprocess_pipeline = Pipeline(stages=indexers + [encoder, label_indexer, assembler])
preprocess_model = preprocess_pipeline.fit(df_clean)
data_prepared = preprocess_model.transform(df_clean)

# Seleccionar solo features y label
data_final = data_prepared.select("features", "label")

# 3a) Dividir datos 70% entrenamiento, 30% pruebas

In [29]:
train, test = data_final.randomSplit([0.7, 0.3], seed=42)

**Definir evaluador**

In [30]:
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

def train_and_evaluate(estimator, paramGrid):
    cv = CrossValidator(estimator=estimator, estimatorParamMaps=paramGrid,
                        evaluator=evaluator, numFolds=5, parallelism=2)
    cv_model = cv.fit(train)
    predictions = cv_model.transform(test)  # <-- aquí se genera predictions
    # Mostrar matriz de confusión dentro de la función
    predictions.groupBy("label", "prediction").count().orderBy("label", "prediction").show()
    accuracy = evaluator.evaluate(predictions)
    print(f"Accuracy: {accuracy}")
    return cv_model, accuracy


# 3b) Modelos y ajuste hiperparámetros

In [31]:
# Logistic Regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=50)
lr_paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.01, 0.1, 1.0]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()
lr_model, lr_acc = train_and_evaluate(lr, lr_paramGrid)

# Decision Tree
dt = DecisionTreeClassifier(featuresCol="features", labelCol="label")
dt_paramGrid = ParamGridBuilder().addGrid(dt.maxDepth, [3, 5, 7]).addGrid(dt.maxBins, [16, 32]).build()
dt_model, dt_acc = train_and_evaluate(dt, dt_paramGrid)

# Random Forest
rf = RandomForestClassifier(featuresCol="features", labelCol="label")
rf_paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [50, 100]).addGrid(rf.maxDepth, [5, 8]).build()
rf_model, rf_acc = train_and_evaluate(rf, rf_paramGrid)

accuracies = {"lr": lr_acc, "dt": dt_acc, "rf": rf_acc}
best_model_name = max(accuracies, key=accuracies.get)
print(f"Mejor modelo: {best_model_name} con accuracy {accuracies[best_model_name]}")

# 3d) Guardar mejor modelo
if best_model_name == "lr":
    best_model = lr_model.bestModel
elif best_model_name == "dt":
    best_model = dt_model.bestModel
else:
    best_model = rf_model.bestModel

# Guardar pipeline de preprocesamiento + mejor modelo en disco
full_pipeline_model = PipelineModel(stages=preprocess_model.stages + [best_model])
full_pipeline_model.write().overwrite().save("best_breast_cancer_model")

spark.stop()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   42|
|  0.0|       1.0|    9|
|  1.0|       0.0|   13|
|  1.0|       1.0|    7|
+-----+----------+-----+

Accuracy: 0.6901408450704225
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   47|
|  0.0|       1.0|    4|
|  1.0|       0.0|   12|
|  1.0|       1.0|    8|
+-----+----------+-----+

Accuracy: 0.7746478873239436
+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  0.0|       0.0|   45|
|  0.0|       1.0|    6|
|  1.0|       0.0|   13|
|  1.0|       1.0|    7|
+-----+----------+-----+

Accuracy: 0.7323943661971831
Mejor modelo: dt con accuracy 0.7746478873239436
