In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, udf, lit, pandas_udf
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.types import ArrayType, DoubleType
from scipy.spatial import distance
import numpy as np
import pandas as pd
import random

In [2]:
spark = SparkSession.builder.getOrCreate ()

In [3]:
datos = spark.read.csv('./breast_cancer/wisc_bc_data.csv', inferSchema=True, header=True)
datos.collect()[0]

23/09/10 13:08:19 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Row(id=842302, diagnosis='M', radius_mean=17.99, texture_mean=10.38, perimeter_mean=122.8, area_mean=1001.0, smoothness_mean=0.1184, compactness_mean=0.2776, concavity_mean=0.3001, concave points_mean=0.1471, symmetry_mean=0.2419, fractal_dimension_mean=0.07871, radius_se=1.095, texture_se=0.9053, perimeter_se=8.589, area_se=153.4, smoothness_se=0.006399, compactness_se=0.04904, concavity_se=0.05373, concave points_se=0.01587, symmetry_se=0.03003, fractal_dimension_se=0.006193, radius_worst=25.38, texture_worst=17.33, perimeter_worst=184.6, area_worst=2019.0, smoothness_worst=0.1622, compactness_worst=0.6656, concavity_worst=0.7119, concave points_worst=0.2654, symmetry_worst=0.4601, fractal_dimension_worst=0.1189)

In [4]:
datos = datos.drop('id')

In [5]:
value_mapping = {
    "M" : "Malignant",
    "B" : "Benign"
}
# Renombrar los valores en la misma columna "diagnosis"
for old_value, new_value in value_mapping.items():
    datos = datos.withColumn("diagnosis", when(datos["diagnosis"] == old_value, new_value).otherwise(datos["diagnosis"]))

In [6]:
column_diagnosis = datos.select("diagnosis")
column_diagnosis.show()

+---------+
|diagnosis|
+---------+
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|Malignant|
|   Benign|
+---------+
only showing top 20 rows



In [7]:
# Contar el número total de datos en el DataFrame
total_count = datos.count()

# Calcular el número de datos para cada categoría
category_counts = datos.groupBy("diagnosis").count()

# Calcular el porcentaje para cada categoría
category_percentages = category_counts.withColumn("percentage",
    (category_counts["count"] / total_count) * 100
)

# Mostrar el DataFrame resultante
category_percentages.show()

+---------+-----+------------------+
|diagnosis|count|        percentage|
+---------+-----+------------------+
|   Benign|  357|62.741652021089635|
|Malignant|  212|37.258347978910365|
+---------+-----+------------------+



In [8]:
#Seleccionamos columnas a describir y obtenemos sus estadisticas
df_describe_radius = datos.select("radius_mean").describe()
df_describe_area = datos.select("area_mean").describe()
df_describe_smoothness = datos.select("smoothness_mean").describe()

#Mostramos el resumen estadistico de cada columna
df_describe_radius.show()
df_describe_area.show()
df_describe_smoothness.show()

+-------+------------------+
|summary|       radius_mean|
+-------+------------------+
|  count|               569|
|   mean|14.127291739894563|
| stddev|3.5240488262120793|
|    min|             6.981|
|    max|             28.11|
+-------+------------------+

+-------+-----------------+
|summary|        area_mean|
+-------+-----------------+
|  count|              569|
|   mean|654.8891036906857|
| stddev|351.9141291816529|
|    min|            143.5|
|    max|           2501.0|
+-------+-----------------+

+-------+--------------------+
|summary|     smoothness_mean|
+-------+--------------------+
|  count|                 569|
|   mean|   0.096360281195079|
| stddev|0.014064128137673616|
|    min|             0.05263|
|    max|              0.1634|
+-------+--------------------+



In [9]:
columns_to_normalize = datos.columns[1:]

columns_to_normalize

['radius_mean',
 'texture_mean',
 'perimeter_mean',
 'area_mean',
 'smoothness_mean',
 'compactness_mean',
 'concavity_mean',
 'concave points_mean',
 'symmetry_mean',
 'fractal_dimension_mean',
 'radius_se',
 'texture_se',
 'perimeter_se',
 'area_se',
 'smoothness_se',
 'compactness_se',
 'concavity_se',
 'concave points_se',
 'symmetry_se',
 'fractal_dimension_se',
 'radius_worst',
 'texture_worst',
 'perimeter_worst',
 'area_worst',
 'smoothness_worst',
 'compactness_worst',
 'concavity_worst',
 'concave points_worst',
 'symmetry_worst',
 'fractal_dimension_worst']

In [10]:
# Definir una función de normalización personalizada
def custom_normalize(value, mean, std):
    return ((value - mean) / (std))

# Iterar a través de las columnas a normalizar
for col_name in columns_to_normalize:
    # Calcular los valores máximos y mínimos para la columna actual
    mean_val = datos.agg({col_name: "mean"}).collect()[0][0]
    std_val = datos.agg({col_name: "std"}).collect()[0][0]

    # Crear una función UDF de pandas para aplicar la normalización
    normalize_udf = pandas_udf(custom_normalize, DoubleType())

    # Aplicar la función de normalización y agregar una nueva columna con el nombre modificado
    datos = datos.withColumn(col_name+'_normalized', normalize_udf(datos[col_name], lit(mean_val), lit(std_val)))

In [11]:
datos = datos.drop(*columns_to_normalize)

datos.show(2)

                                                                                

+---------+----------------------+-----------------------+-------------------------+--------------------+--------------------------+---------------------------+-------------------------+------------------------------+------------------------+---------------------------------+--------------------+---------------------+-----------------------+------------------+------------------------+-------------------------+-----------------------+----------------------------+----------------------+-------------------------------+-----------------------+------------------------+--------------------------+---------------------+---------------------------+----------------------------+--------------------------+-------------------------------+-------------------------+----------------------------------+
|diagnosis|radius_mean_normalized|texture_mean_normalized|perimeter_mean_normalized|area_mean_normalized|smoothness_mean_normalized|compactness_mean_normalized|concavity_mean_normalized|concave points_me

In [12]:
datos.describe().show()

[Stage 202:>                                                        (0 + 1) / 1]

+-------+---------+----------------------+-----------------------+-------------------------+--------------------+--------------------------+---------------------------+-------------------------+------------------------------+------------------------+---------------------------------+--------------------+---------------------+-----------------------+--------------------+------------------------+-------------------------+-----------------------+----------------------------+----------------------+-------------------------------+-----------------------+------------------------+--------------------------+---------------------+---------------------------+----------------------------+--------------------------+-------------------------------+-------------------------+----------------------------------+
|summary|diagnosis|radius_mean_normalized|texture_mean_normalized|perimeter_mean_normalized|area_mean_normalized|smoothness_mean_normalized|compactness_mean_normalized|concavity_mean_normalized

                                                                                

In [13]:
# Dividir los datos en conjuntos de entrenamiento y prueba (por ejemplo, 80% de entrenamiento y 20% de prueba)
train_ratio = 0.8
test_ratio = 1 - train_ratio
train_data, test_data = datos.randomSplit([train_ratio, test_ratio], seed=150)

In [14]:
#Juntamos nuestras caracteristicas en el vector 'features'
vector_assembler = VectorAssembler(inputCols=datos.columns[1:], outputCol="features")
train_data = vector_assembler.transform(train_data).select('diagnosis','features')
test_data = vector_assembler.transform(test_data).select('diagnosis','features')


In [15]:
train = train_data.toPandas()
train =train.drop([0])

test = test_data.toPandas()

                                                                                

In [16]:
def predict(train_dataset, test_dataset, k):
    #Creamos nuestro arreglo donde se guardarán las predicciones
    predictions = []
    #Recorremos las filas de los datos de prueba
    for row in test_dataset.values:
        #Creamos diccionario donded guardaremos las distancias con las respectivas etiquetas 
        distances = []
        #Recorremos filas de entrenamiento
        for row_train in train_dataset.values:
            #Calculamos distancias y guardamos en diccionario con su respectiva etiqueta
            distances.append({'d':distance.euclidean(row[1],row_train[1]), 'label': row_train[0]})
        #Ordenamos las distancias de menor a mayor
        distances = sorted(distances, key=lambda x: x['d'])
        #Creamos diccionario donde se guardaran los votos
        votes = {'Malignant':0, 'Benign':0}
        #Contamos los votos de los k vecinos más cercanos
        for i in distances[:k]:
            votes[i['label']] = votes[i['label']] + 1

        #Obtenemos la clase con la mayor cantidad de votos
        if votes['Malignant'] > votes['Benign']:
            predictions.append('Malignant')
        elif votes['Malignant'] < votes['Benign']:
            predictions.append('Benign')
        else:
            if random.random() > 0.5:
                predictions.append('Benign')
            else:
                predictions.append('Malignant')

    return predictions
            

In [17]:
def evaluate(predictions, labels):
    #Variable donde guardaremos las predicciones correctas
    correct = 0
    #Recorremos las predicciones junto con las etiquetas reales
    for pred, label in zip(predictions, labels):
        #Si realizo correctamente la rpedicción agregamos 1
        if pred == label:
            correct = correct + 1
    #Obtenemos el porcentaje de los valores predecidos correctamente
    accuracy = (correct * 100)/len(predictions)
    return accuracy

In [18]:
#Realizamos predicción
predictions_test = predict(train, test, 3)
#Calculamos accuracy
accuracy = evaluate(predictions_test, list(test['diagnosis']))

In [19]:
print(accuracy)

99.12280701754386
