In [1]:
#Bibliotecas para poder trabajar con Spark
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
#Configuración de Spark con Python
!pip install -q findspark
!pip install pyspark

#Estableciendo variable de entorno
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

#Buscando e inicializando la instalación de Spark
import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 5,484 B/110[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 38.8 kB/110[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [1 InRelease 72.1 kB/110[0m[33m0% [Connecting to archive.ubuntu.com (185.125.190.39)] [Connecting to ppa.launc[0m                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.52[0m                                                                               Hit:4 http://ar

'/content/spark-3.5.0-bin-hadoop3'

In [2]:
# Verificar la funcionalidad de Pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.appName('Modulo_1').getOrCreate()
spark_session

In [4]:
# Uso de un dataset sobre cáncer de mama
df_spark = spark_session.read.csv('sample_data/cancer.csv', header=True, inferSchema=True)
# Vemos nuestros datos en un dataframe de pyspark
df_spark.show(5)

+--------+---------+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+
|      id|diagnosis|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|
+--------+---------+-----------+------------+---

Mostramos el esquema para poder ver cuales atributos son importantes pra nuestra prediccion

In [5]:
# Imprimimos el esquema con los tipos de datos que se infirieron
df_spark.printSchema()

root
 |-- id: integer (nullable = true)
 |-- diagnosis: string (nullable = true)
 |-- radius_mean: double (nullable = true)
 |-- texture_mean: double (nullable = true)
 |-- perimeter_mean: double (nullable = true)
 |-- area_mean: double (nullable = true)
 |-- smoothness_mean: double (nullable = true)
 |-- compactness_mean: double (nullable = true)
 |-- concavity_mean: double (nullable = true)
 |-- concave points_mean: double (nullable = true)
 |-- symmetry_mean: double (nullable = true)
 |-- fractal_dimension_mean: double (nullable = true)
 |-- radius_se: double (nullable = true)
 |-- texture_se: double (nullable = true)
 |-- perimeter_se: double (nullable = true)
 |-- area_se: double (nullable = true)
 |-- smoothness_se: double (nullable = true)
 |-- compactness_se: double (nullable = true)
 |-- concavity_se: double (nullable = true)
 |-- concave points_se: double (nullable = true)
 |-- symmetry_se: double (nullable = true)
 |-- fractal_dimension_se: double (nullable = true)
 |-- radi

Vamos a eliminar la columna id porque no es importante para la prediccion que buscamos


In [6]:
df_spark = df_spark.drop("id")

Las variables categoricas las pasamos a numericas porque nuestro modelo no puede procesar strings

In [7]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='diagnosis', outputCol='diagnosis#')
indexer_fitted = indexer.fit(df_spark)
df_indexed = indexer_fitted.transform(df_spark)
df_indexed = df_indexed.drop("diagnosis")
df_indexed.show()

+-----------+------------+--------------+---------+---------------+----------------+--------------+-------------------+-------------+----------------------+---------+----------+------------+-------+-------------+--------------+------------+-----------------+-----------+--------------------+------------+-------------+---------------+----------+----------------+-----------------+---------------+--------------------+--------------+-----------------------+----------+
|radius_mean|texture_mean|perimeter_mean|area_mean|smoothness_mean|compactness_mean|concavity_mean|concave points_mean|symmetry_mean|fractal_dimension_mean|radius_se|texture_se|perimeter_se|area_se|smoothness_se|compactness_se|concavity_se|concave points_se|symmetry_se|fractal_dimension_se|radius_worst|texture_worst|perimeter_worst|area_worst|smoothness_worst|compactness_worst|concavity_worst|concave points_worst|symmetry_worst|fractal_dimension_worst|diagnosis#|
+-----------+------------+--------------+---------+-------------

Dividimos los datos en conjuntos de entrenamiento y prueba con el fin de evaluar el rendimiento de nuestro modelo después de ser entrenado. Asignamos el 70% de los datos al conjunto de entrenamiento para garantizar una cantidad adecuada de ejemplos de entrenamiento, y el restante se destina al conjunto de prueba, ya que contamos con suficientes instancias para utilizar esta configuración.

In [8]:
# Hacemos un split de los datos en train y test
(train, test) = df_indexed.randomSplit([0.7, 0.3], seed=42)

Transformamos nuestros datos en un vector ensamblado para que la biblioteca pyspark.ml pueda procesarlos, y utilizamos todas las variables restantes, ya que son pertinentes para la tarea de clasificación que estamos llevando a cabo.

In [9]:
# Pasar nuestros datos a un vector assembler
from pyspark.ml.feature import VectorAssembler

cols = ["radius_mean", "texture_mean",
        "perimeter_mean", "area_mean",
        "smoothness_mean", "compactness_mean",
        "concavity_mean", "concave points_mean",
        "symmetry_mean", "fractal_dimension_mean",
        "radius_se", "texture_se", "perimeter_se",
        "area_se", "smoothness_se", "compactness_se",
        "concavity_se", "concave points_se",
        "symmetry_se", "fractal_dimension_se",
        "radius_worst", "texture_worst", "perimeter_worst",
        "area_worst", "smoothness_worst",
        "compactness_worst", "concavity_worst",
        "concave points_worst", "symmetry_worst",
        "fractal_dimension_worst"]

assembler = VectorAssembler(
    inputCols=cols,
    outputCol="features")

transformed_train = assembler.transform(train)
transformed_test = assembler.transform(test)

# Droppeamos columnas innecesarias
transformed_train = transformed_train.drop(*cols)
transformed_test = transformed_test.drop(*cols)
transformed_train.show()

+----------+--------------------+
|diagnosis#|            features|
+----------+--------------------+
|       0.0|[6.981,13.43,43.7...|
|       0.0|[7.691,25.44,48.3...|
|       0.0|[7.76,24.54,47.92...|
|       0.0|[8.196,16.84,51.7...|
|       0.0|[8.219,20.7,53.27...|
|       0.0|[8.597,18.6,54.09...|
|       0.0|[8.671,14.45,54.4...|
|       0.0|[8.726,15.83,55.8...|
|       0.0|[8.734,16.84,55.2...|
|       0.0|[9.0,14.4,56.36,2...|
|       0.0|[9.029,17.33,58.7...|
|       0.0|[9.042,18.9,60.07...|
|       0.0|[9.268,12.87,61.4...|
|       0.0|[9.333,21.94,59.0...|
|       0.0|[9.423,27.88,59.2...|
|       0.0|[9.436,18.32,59.8...|
|       0.0|[9.465,21.01,60.1...|
|       0.0|[9.667,18.49,61.4...|
|       0.0|[9.676,13.14,64.1...|
|       0.0|[9.731,15.34,63.7...|
+----------+--------------------+
only showing top 20 rows



Dado que planeamos utilizar pyspark.ml, necesitamos asegurarnos de que nuestros vectores estén en un formato compatible.

In [10]:
from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return mllib_linalg.DenseVector(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

Para concluir, transformamos nuestros vectores en puntos etiquetados (labeled points) y los incluimos en un RDD, ya que MLlib solo puede procesar RDDs.

In [11]:
# Pasar nuestros features a un Labeled Point y que MLlib lo pueda procesar
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

train_dataset = (transformed_train.select(col("diagnosis#").alias("label"), col("features"))\
                  .rdd.map(lambda row: LabeledPoint(row.label, as_old(row.features))))


test_dataset = (transformed_test.select(col("diagnosis#").alias("label"), col("features"))\
                  .rdd.map(lambda row: LabeledPoint(row.label, as_old(row.features))))

Elegimos emplear un árbol de decisión debido a que, en problemas de clasificación, los árboles suelen ser más efectivos.

In [12]:
# Como el problema con el que nos encontramos es de clasificación aplicaremos un decision tree con MLlib de pyspark
from pyspark.mllib.tree import DecisionTree
from pyspark.mllib.tree import DecisionTreeModel

numClasses = 2
categoricalFeaturesInfo = {}
impurity = "gini"

# transformamos el dataframe a un rdd y poder usar MLlib
model = DecisionTree.trainClassifier(train_dataset, numClasses,
                                     categoricalFeaturesInfo, impurity)

Evaluamos la eficacia de nuestro modelo utilizando los datos de prueba y calculamos el error, el cual resulta ser bastante bajo, alrededor del 5%.

In [13]:
predictions = model.predict(test_dataset.map(lambda x: x.features))
labelsAndPredictions = test_dataset.map(lambda lp: lp.label).zip(predictions)
testErr = labelsAndPredictions.filter(lambda lp: lp[0] != lp[1]).count() / float(test_dataset.count())
print('Test Error = ' + str(testErr))
print('Learned classification tree model:')
print(model.toDebugString())

Test Error = 0.06293706293706294
Learned classification tree model:
DecisionTreeModel classifier of depth 5 with 27 nodes
  If (feature 23 <= 868.2)
   If (feature 27 <= 0.13215)
    If (feature 10 <= 1.0085)
     If (feature 27 <= 0.11065)
      Predict: 0.0
     Else (feature 27 > 0.11065)
      If (feature 9 <= 0.05648)
       Predict: 1.0
      Else (feature 9 > 0.05648)
       Predict: 0.0
    Else (feature 10 > 1.0085)
     Predict: 1.0
   Else (feature 27 > 0.13215)
    If (feature 21 <= 27.205)
     If (feature 4 <= 0.12215000000000001)
      If (feature 1 <= 20.675)
       Predict: 0.0
      Else (feature 1 > 20.675)
       Predict: 1.0
     Else (feature 4 > 0.12215000000000001)
      Predict: 1.0
    Else (feature 21 > 27.205)
     If (feature 8 <= 0.15339999999999998)
      Predict: 0.0
     Else (feature 8 > 0.15339999999999998)
      Predict: 1.0
  Else (feature 23 > 868.2)
   If (feature 1 <= 14.975000000000001)
    If (feature 4 <= 0.09582)
     Predict: 0.0
    Else (f

In [14]:
# Guardamos nuestro modelo para cargarlo después, en caso de ser necesario
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

model.save(sc, "BC_ClassificationModel.dt")
sameModel = DecisionTreeModel.load(sc, "BC_ClassificationModel.dt")