# 4 - Algoritmos de clasificación

Los algoritmos de **clasificación** son uno de los dos tipos en los que pueden divisirse los algoritmos de **aprendizaje supervisado**. Estos algoritmos suponen que partimos de un conjunto de datos etiquetado previamente.

Un algoritmo de clasificación se centra en resolver problemas en los cuales se debe predecir el valor de la variable cualitativa $\mathbf y$ a partir de la variable $\mathbf x$. En clasificación la variable $\mathbf{x}$ se denomina **características** o **atributos** mientras que $\bf y$ se denomina **etiqueta**.

In [None]:
## Solicitar acceso y montar en el sistema tu directorio de Google Drive
# Esto permitirá la persistencia de datos entre distintas sesiones
import os, sys
from google.colab import drive
drive.mount('/content/drive')

# Creamos un directorio para este notebook y lo asociamos
drive_path = '/content/drive/MyDrive/Colab Notebooks/ia-bd-m4-sistemas-de-big-data'
nb_path = '/content/ia-bd-m4-sistemas-de-big-data'

if not os.path.exists(drive_path):
    os.makedirs(drive_path)
os.symlink(drive_path, nb_path)
sys.path.insert(0,nb_path)

%cd $nb_path

Mounted at /content/drive
/content/drive/MyDrive/Colab Notebooks/ia-bd-m4-sistemas-de-big-data


## Instalación de spark en Google Colab

In [None]:
# Spark está escrito en el lenguaje de programación Scala, por lo que requiere
# de una máquina virtual de Java (JVM) para funcionar. Por lo tanto, lo primero
# es instalar java:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
## Instalación de Apache Spark en Google Colab (Ejecutar solo si no se ha realizado nunca)
# Nota: Puede tardar unos minutos (se paciente)
# El siguiente paso es elegir una versión reciente de spark
# En este notebook, se usará spark versión 3.1.2, la cual puede descargarse en:
spark_file = 'spark-3.1.2-bin-hadoop3.2.tgz'
spark_url = 'https://archive.apache.org/dist/spark/spark-3.1.2/' + spark_file

# A continuación, descargamos la versión elegida de spark:
import os # Libreria de manejo del sistema operativo
os.system("wget -q {spark_url} -P " + nb_path) # Realizamos la descarga
os.system("tar xf " + nb_path + "/" + spark_file) # Descomprimimos el fichero .tgz

# Realizamos la instalación de pyspark utilizando la herramienta pip
!pip install --target=$nb_path -q pyspark
!pip install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m12.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[0m

In [None]:
# Damos permisos de ejecución
!chmod -R +x ./pyspark/

# Finalmente, es necesario definir algunas variables de entorno en el sistema
# operativo para poder usar spark correctamente:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = nb_path + "/pyspark"

## Cargar el conjunto de datos que se utilizará

En este caso utilizaremos un conjunto de datos completo sobre la autentificación de billetes (*banknote_authentication.csv*). Este dataset ha sido obtenido del repositorio [UCI](https://archive.ics.uci.edu/ml/datasets/banknote+authentication).

Los datos se extrajeron de imágenes tomadas de ejemplares de billetes auténticos y falsificados. Para la digitalización se utilizó una cámara industrial utilizada habitualmente para la inspección de impresos. Las imágenes finales tienen 400x 400 píxeles. Debido a la lente del objeto y a la distancia al objeto investigado se obtuvieron imágenes en escala de grises con una resolución de unos 660 ppp. Se utilizó la herramienta de transformación Wavelet para extraer las características de las imágenes.

Hay 1372 observaciones (imágenes de billetes de banco). Hay 4 variables de predicción (varianza de la imagen, asimetría, curtosis, entropía). La variable a predecir se codifica como 0 (billete auténtico) o 1 (billete falso).

In [None]:
# Descargar el dataset que se utilizará utilizando un enlace compartido de google drive
# El fichero puede descargarse directamente de la fuente proporcionada, o se
# puede utilizar el siguiente código para descargarlo de google drive:
# URL: https://drive.google.com/file/d/1CvOH2oT8iptb6vL2eJPmZuukE6O_J1Me
!gdown --id 1CvOH2oT8iptb6vL2eJPmZuukE6O_J1Me

Downloading...
From: https://drive.google.com/uc?id=1CvOH2oT8iptb6vL2eJPmZuukE6O_J1Me
To: /content/drive/MyDrive/Colab Notebooks/ia-bd-m4-sistemas-de-big-data/banknote_authentication.csv
100% 46.4k/46.4k [00:00<00:00, 98.5MB/s]


Creamos un cluster de Spark e importamos el dataframe actual en Spark.

In [None]:
# Importar pyspark.sql
from pyspark.sql import*

# Importar SparkContext and SparkConf
from pyspark import SparkContext, SparkConf

# Establecer las propiedades de Spark:
# - URL de conexión
# - Nombre de la aplicación
conf = SparkConf().setMaster("local").setAppName("4-Algoritmos-de-clasificacion")

# Iniciar un cluster de Spark (puede tardar unos minutos)
# Comprobar si ya existe este cluster y en el caso contrario crear uno nuevo
sc = SparkContext.getOrCreate(conf=conf)

# Mostramos el cluster creado
sc

In [None]:
# Inicializar SQLContext a partir del cluster Spark creado anteriormente
sqlContext = SQLContext(sc)

# Creamos un dataframe a partir del archivo CSV descargado anteriormente y
# que contiene el dataset que utilizaremos en esta sesión
df = sqlContext.read.csv('banknote_authentication.csv', header=True, sep=",", inferSchema = "true")

# Mostrar el contenido del dataframe (las 5 primeras observaciones). Forgery = 0 --> billete verdadero
df.show(5)



+--------+--------+--------+--------+-------+
|variance|skewness|curtosis| entropy|forgery|
+--------+--------+--------+--------+-------+
|  3.6216|  8.6661| -2.8073|-0.44699|      0|
|  4.5459|  8.1674| -2.4586| -1.4621|      0|
|   3.866| -2.6383|  1.9242| 0.10645|      0|
|  3.4566|  9.5228| -4.0112| -3.5944|      0|
| 0.32924| -4.4552|  4.5718| -0.9888|      0|
+--------+--------+--------+--------+-------+
only showing top 5 rows



## Pre-procesamiento del dataset y obtención de conjuntos de entrenamiento y test

Utilizaremos el *VectorAssembler* para generar una nueva columna en el dataframe la cual tendrá un vector del tipo DenseVector conteniendo todas las características del dataset. Recordad que este era un paso necesario antes de aplicar cualquier algoritmo de ML de la biblioteca [MLlib de Spark](https://spark.apache.org/docs/latest/ml-guide.html).

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
  inputCols = ['variance', 'skewness', 'curtosis', 'entropy'],
  outputCol = "features"
)
df = assembler.transform(df)

# Mostramos la nueva columa "features" para las 5 primeras filas del dataset
df.select("features").show(5, truncate=False)

+--------------------------------+
|features                        |
+--------------------------------+
|[3.6216,8.6661,-2.8073,-0.44699]|
|[4.5459,8.1674,-2.4586,-1.4621] |
|[3.866,-2.6383,1.9242,0.10645]  |
|[3.4566,9.5228,-4.0112,-3.5944] |
|[0.32924,-4.4552,4.5718,-0.9888]|
+--------------------------------+
only showing top 5 rows



Posteriormente procedemos a estandarizar las entradas numéricas con las que contamos. Este es un proceso muy recomendable para la aplicación de algunos de los algoritmos de clasificación, como por ejemplo para la regresión logística. El escalado que aplicaremos será una [estandarízación](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.StandardScaler.html) de desviación estándar la unidad.

Recomendamos al lector consultar otro tipo de escalados utilizados con bastante frecuencia. Por ejemplo: [RobustScaler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.RobustScaler.html), [MinMaxScaler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MinMaxScaler.html) y [MaxAbsScaler](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.MaxAbsScaler.html).

No podemos olvidar que si llegaran nuevos datos para hacer predicciones deberemos aplicar tanto el transform anterior como el escalado siguiente.

In [None]:
# Aplicamos el standard Scaler
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol='features',outputCol='standardized')
fit_scaler = scaler.fit(df)
df = fit_scaler.transform(df)

# Mostramos la nueva columa "standardized" para las 5 primeras filas del dataset
df.select("standardized").show(5, truncate=False)

+---------------------------------------------------------------------------------+
|standardized                                                                     |
+---------------------------------------------------------------------------------+
|[1.273972021962275,1.47657709649511,-0.6513411603422402,-0.21274974061406512]    |
|[1.599113489794098,1.391605887067327,-0.5704368527828988,-0.6959023596765579]    |
|[1.3599447307560621,-0.44952785609248086,0.4464469991559643,0.05066603254741098] |
|[1.2159298904116411,1.622546286623006,-0.9306663564153435,-1.7107936814317894]   |
|[0.11581691752564043,-0.7591011274166019,1.0607350539139577,-0.47063008908294945]|
+---------------------------------------------------------------------------------+
only showing top 5 rows



Ahora vamos a dividir el dataset en un conjunto de entrenamiento y otro de test. Gracias a esto, podremos estimar cada modelo sobre el conjunto de entrenamiento, utilizando el conjunto de test para validar los resultados obtenidos.

In [None]:
# Dividimos entre entrenamiento (70%) y test (30%)
(train_df, test_df) = df.randomSplit([0.7, 0.3], seed=100)

print("Número de filas df entrenamiento: {train:d}".format(train=train_df.count()))
print("Número de filas df test: {test:d}".format(test=test_df.count()))

Número de filas df entrenamiento: 968
Número de filas df test: 404


## Regresión logística

Para más información sobre la implementación de PySpark sobre la regresión logistica puede consultar la [documentación de la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LogisticRegression.html).

**Ajuste de hiperparámetros del modelo.**

Tema complejo que se escapa de los contenidos de este curso. Todos los algoritmos tienen una serie de hiperparámetros para ajustar el modelo. Dependiendo del dataset, el algoritmo se comportará mejor o peor con unos valores u otros para esos hiperparámetros. Hay que intentar ajustarlos sin sobreajustar. Una solución es el tuning de parametros.

Uno de los enfoques para hacer esto es construir una rejilla en función del número de hiperparámetros que queramos ajustar. Para cada uno definimos unos valores.

Mediante el uso del objeto CrossValidator se puede dividir el conjunto de datos en un subconjunto de pliegues (folds) que se utilizan como conjuntos de datos de entrenamiento y de prueba separados. Esta técnica se conoce como cross-validación. Por ejemplo, con k=3 pliegues, CrossValidator generará 3 pares de conjuntos de datos (de entrenamiento, de prueba), cada uno de los cuales utiliza 2/3 de los datos para el entrenamiento y 1/3 para la prueba. Para evaluar cada conjunto de parámetros posibles, CrossValidator calcula la métrica de evaluación media para los 3 modelos producidos al ajustar el estimador en los 3 pares de conjuntos de datos diferentes (entrenamiento, prueba).

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(labelCol="forgery", featuresCol="standardized",
                        maxIter=100)

# Definimos los parámetros del grid donde se buscarán los parámetros óptimos. Con el punto vamos concatenando.
# Los valores 1; 0,1; 0,01; etc --> Experiencia, consulta de documentación sobre el algoritmo...
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [1, 0.1, 0.01, 0.001]) \
    .build()

# Ya tenemos algoritmo y rejilla. Definimos evaluador. En este caso binario (hay más).
# Definimos también sus hiperparámetros
evaluator = BinaryClassificationEvaluator(labelCol="forgery", rawPredictionCol="prediction")

# Definimos la cross-validación
crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Ejecutar la cross-validación y elegir el mejor conjunto de parámetros
# En este caso solo se buscó uno
cvModel = crossval.fit(train_df)

print("RegParam: " + str(cvModel.bestModel.getRegParam()))

RegParam: 0.001


**Entrenamiento del modelo.**

In [None]:
# Entrenar un modelo de regresión logistica binario (2 clases)
from pyspark.ml.classification import LogisticRegression

# Hiperparámetros. Forgery seria laetiqueta, standardized el dense vector escalado. Los otros dos se han ajustado en lacelda anterior.
# Según el algoritmo puede que nos vengan dados. En otros casos se calculan como en la celda anterior
lr = LogisticRegression(labelCol="forgery", featuresCol="standardized",
                        maxIter=100, regParam=0.001)
# Modelo ajustado
lr_model = lr.fit(train_df)

# Aplicamos el modelo sobre el conjunto de test
pred = lr_model.transform(test_df)

# Mostramos las clases reales junto con las predicciones realizadas
pred.select("forgery", "features", "prediction").show(10)

+-------+--------------------+----------+
|forgery|            features|prediction|
+-------+--------------------+----------+
|      1|[-6.7526,8.8172,-...|       1.0|
|      1|[-6.3979,6.4479,1...|       1.0|
|      1|[-6.3679,8.0102,0...|       1.0|
|      1|[-6.2003,8.6806,0...|       1.0|
|      1|[-6.1632,8.7096,-...|       1.0|
|      1|[-5.8818,7.6584,0...|       1.0|
|      1|[-5.873,9.1752,-0...|       1.0|
|      1|[-5.4414,7.2363,0...|       1.0|
|      1|[-5.2406,6.6258,-...|       1.0|
|      1|[-5.0676,-5.1877,...|       1.0|
+-------+--------------------+----------+
only showing top 10 rows



In [None]:
# Mostrar los coeficientes y los términos de interceptación de la regresión logistica
# Al final se trata de una recta
print("Coeficientes (1 por variable): " + str(lr_model.coefficientMatrix))
print("Terminos de interceptación: " + str(lr_model.interceptVector))

Coeficientes (1 por variable): DenseMatrix([[-4.60923365, -4.66366829, -4.24543597,  0.23898744]])

Terminos de interceptación: [2.332104765021363]


**Evaluación del modelo.**

Vamos a ver cómo se comporta el modelo en los datos de test, utilizando el valor de la exactitud o **accuracy** que mide el porcentaje de casos que el modelo ha acertado (fracción de predicciones correctas): predicciones correctas / número total de elementos.

Se debe tener cuidado, pues esta es una métrica muy engañosa. De hecho, si tuviésemos un modelo que siempre predijera siempre una de las clases, el resultado sería el porcentaje de elementos de dicha clase. ¿Qué pasa si el problema está desbalanceado y hay muchas observaciones de dicha clase? Se deben tener en cuenta más indicadores. No obstante, esto queda fuera de los contenidos de este curso y se estudiarán en el módulo de Sistemas de Aprendizaje Automático. Problema con un modelo dummy que sobre un dataset 90-10 siempre evalue cierto.

En el módulo SAA se aprenderán diferentes métricas para evaluar los algoritmos.

En este caso se va a usar un **MulticlassClassificationEvaluator** que también funciona para clases binarias.

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="forgery", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: {}".format(accuracy))

Accuracy: 0.9801980198019802


Podremos observar también la **matriz de confusión**.
La matriz de confusión es una herramienta muy útil para valorar cómo de bueno es un modelo clasificación. En particular, sirve para mostrar de forma explícita cuándo una clase es confundida con otra. Cada columna de la matriz representa el número de predicciones de cada clase, mientras que cada fila representa a las instancias en la clase real.

Para ello, lo primero que haremos será crear un nuevo dataframe solo con las prediciones y la clase real (los resultados corectos, es decir, la columna "forgery"). Posteriormente, tenemos que hacer un casting a la columna "forgery" para pasarla de entero a float, ya que la función para obtener la matriz de confusión solo funcionará con floats. Por último, mapeamos el dataframe en un RDD como una tupla y pasamos esta a la función MulticlassMetrics de MLlib.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Selecionamos solo las prediciones y los valores reales (forgery)
preds_and_labels = pred.select(['prediction','forgery'])
# Hacemos un casting de entero a float para la columna forgery
preds_and_labels = preds_and_labels.withColumn("forgery", preds_and_labels["forgery"].cast('float'))
# Pasamos el resultado anterior a un objeto RDD (Resilient Distributed Datasets) de tuplas
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

# Mostramos la matriz de confusión
# Filas valores reales, columnas valores predichos.
# Primera clase (billetes verdaderos) ha predicho 218 reales (que lo eran) y 1 que era verdadero (que era falso (falso negativo)).
# Para los billetes que ha clasificado como falsos (segunda columna). 7 ha dicho que eran falsos, siendo positivos (falso positivo) y 178 falsos siendo falsos.
print(metrics.confusionMatrix().toArray())



[[218.   7.]
 [  1. 178.]]


## Máquinas de Vectores de Soporte (SVM) - Lineales

Para más información sobre la implementación de PySpark sobre las Máquinas de Vectores de Soporte (SVM) puede consultar la [documentación de la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.LinearSVC.html).

**Ajuste de hiperparámetros del modelo.**

In [None]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lsvc = LinearSVC(labelCol="forgery", featuresCol="standardized",
                 maxIter=100)

# Definimos los parámetros del grid donde se buscarán los parámetros óptimos
paramGrid = ParamGridBuilder() \
    .addGrid(lsvc.regParam, [1, 0.1, 0.01, 0.001]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="forgery", rawPredictionCol="prediction")

# Definimos la cross-validación
crossval = CrossValidator(estimator=lsvc,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Ejecutar la cross-validación y elegir el mejor conjunto de parámetros
cvModel = crossval.fit(train_df)

print("RegParam: " + str(cvModel.bestModel.getRegParam()))

RegParam: 0.001


**Entrenamiento del modelo.**

In [None]:
from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(labelCol="forgery", featuresCol="standardized",
                 maxIter=100, regParam=0.001)

lsvc_model = lsvc.fit(train_df)
pred = lsvc_model.transform(test_df)

# Mostramos las clases reales junto con las predicciones realizadas
pred.select("forgery", "features", "prediction").show(10)

+-------+--------------------+----------+
|forgery|            features|prediction|
+-------+--------------------+----------+
|      1|[-6.7526,8.8172,-...|       1.0|
|      1|[-6.3979,6.4479,1...|       1.0|
|      1|[-6.3679,8.0102,0...|       1.0|
|      1|[-6.2003,8.6806,0...|       1.0|
|      1|[-6.1632,8.7096,-...|       1.0|
|      1|[-5.8818,7.6584,0...|       1.0|
|      1|[-5.873,9.1752,-0...|       1.0|
|      1|[-5.4414,7.2363,0...|       1.0|
|      1|[-5.2406,6.6258,-...|       1.0|
|      1|[-5.0676,-5.1877,...|       1.0|
+-------+--------------------+----------+
only showing top 10 rows



**Evaluación del modelo.**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="forgery", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: {}".format(accuracy))

Accuracy: 0.9826732673267327


Obtenemos la matriz de confusión.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Selecionamos solo las prediciones y los valores reales (forgery)
preds_and_labels = pred.select(['prediction','forgery'])
# Hacemos un casting de entero a float para la columna forgery
preds_and_labels = preds_and_labels.withColumn("forgery", preds_and_labels["forgery"].cast('float'))
# Pasamos el resultado anterior a un objeto RDD (Resilient Distributed Datasets) de tuplas
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

# Mostramos la matriz de confusión
print(metrics.confusionMatrix().toArray())



[[219.   6.]
 [  1. 178.]]


## Árboles de decisión - Clasificador Random Forest

Para más información sobre la implementación de PySpark sobre el clasificador Random Forest puede consultar la [documentación de la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.RandomForestClassifier.html).

**Ajuste de hiperparámetros del modelo.**

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

rf = RandomForestClassifier(labelCol="forgery", featuresCol="standardized")

# Definimos los parámetros del grid donde se buscarán los parámetros óptimos
paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [3, 6, 10]) \
    .addGrid(rf.numTrees, [50, 100, 150, 250]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="forgery", rawPredictionCol="prediction")

# Definimos la cross-validación
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Ejecutar la cross-validación y elegir el mejor conjunto de parámetros
cvModel = crossval.fit(train_df)

print("MaxDepth: " + str(cvModel.bestModel.getMaxDepth()))
print("NumTrees: " + str(cvModel.bestModel.getNumTrees))

MaxDepth: 10
NumTrees: 50


**Entrenamiento del modelo.**

In [None]:
# Entrenar un clasificador Random Forest
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="forgery", featuresCol="standardized",
                            maxDepth=10, numTrees=100, seed=10)

rf_model = rf.fit(train_df)
pred = rf_model.transform(test_df)

# Mostramos las clases reales junto con las predicciones realizadas
pred.select("forgery", "features", "prediction").show(10)

+-------+--------------------+----------+
|forgery|            features|prediction|
+-------+--------------------+----------+
|      1|[-6.7526,8.8172,-...|       1.0|
|      1|[-6.3979,6.4479,1...|       1.0|
|      1|[-6.3679,8.0102,0...|       1.0|
|      1|[-6.2003,8.6806,0...|       1.0|
|      1|[-6.1632,8.7096,-...|       1.0|
|      1|[-5.8818,7.6584,0...|       1.0|
|      1|[-5.873,9.1752,-0...|       1.0|
|      1|[-5.4414,7.2363,0...|       1.0|
|      1|[-5.2406,6.6258,-...|       1.0|
|      1|[-5.0676,-5.1877,...|       1.0|
+-------+--------------------+----------+
only showing top 10 rows



**Evaluación del modelo.**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="forgery", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: {}".format(accuracy))

Accuracy: 0.995049504950495


Obtenemos ahora la matriz de confusión. Vemos como obtenemos muy buenos resultados con este modelo (para este dataset concreto). **En general, la técnica Random Forest (así como otras derivadas de esta, como el gradient boosting) es una de las mejores técnicas de clasificación.**

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Selecionamos solo las prediciones y los valores reales (forgery)
preds_and_labels = pred.select(['prediction','forgery'])
# Hacemos un casting de entero a float para la columna forgery
preds_and_labels = preds_and_labels.withColumn("forgery", preds_and_labels["forgery"].cast('float'))
# Pasamos el resultado anterior a un objeto RDD (Resilient Distributed Datasets) de tuplas
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

# Mostramos la matriz de confusión
print(metrics.confusionMatrix().toArray())

[[225.   0.]
 [  2. 177.]]


## Árboles de Decisión - Gradient-boosted tree classifier

Para más información sobre la implementación de PySpark sobre el algoritmo gradient-boosted tree classifier puede consultar la [documentación de la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.GBTClassifier.html).

Más avanzado que el anterior. Puede usarse con datasets de terabytes. Existen implementaciones que no usan spark. Muy usado (y premiado) en torneos de ML.

**Ajuste de hiperparámetros del modelo.**

In [None]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

gbt = GBTClassifier(labelCol="forgery", featuresCol="standardized")

# Definimos los parámetros del grid donde se buscarán los parámetros óptimos
paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [3, 6, 10]) \
    .build()

evaluator = BinaryClassificationEvaluator(labelCol="forgery", rawPredictionCol="prediction")

# Definimos la cross-validación
crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=3)

# Ejecutar la cross-validación y elegir el mejor conjunto de parámetros
cvModel = crossval.fit(train_df)

print("MaxDepth: " + str(cvModel.bestModel.getMaxDepth()))

MaxDepth: 6


**Entrenamiento del modelo.**

In [None]:
# Entrenar un clasificador Random Forest
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="forgery", featuresCol="standardized",
                            maxDepth=10, seed=10)

gbt_model = gbt.fit(train_df)
pred = gbt_model.transform(test_df)

# Mostramos las clases reales junto con las predicciones realizadas
pred.select("forgery", "features", "prediction").show(10)

+-------+--------------------+----------+
|forgery|            features|prediction|
+-------+--------------------+----------+
|      1|[-6.7526,8.8172,-...|       1.0|
|      1|[-6.3979,6.4479,1...|       1.0|
|      1|[-6.3679,8.0102,0...|       1.0|
|      1|[-6.2003,8.6806,0...|       1.0|
|      1|[-6.1632,8.7096,-...|       1.0|
|      1|[-5.8818,7.6584,0...|       1.0|
|      1|[-5.873,9.1752,-0...|       1.0|
|      1|[-5.4414,7.2363,0...|       1.0|
|      1|[-5.2406,6.6258,-...|       1.0|
|      1|[-5.0676,-5.1877,...|       1.0|
+-------+--------------------+----------+
only showing top 10 rows



**Evaluación del modelo.**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="forgery", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: {}".format(accuracy))

Accuracy: 0.9900990099009901


Obtenemos ahora la matriz de confusión.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Selecionamos solo las prediciones y los valores reales (forgery)
preds_and_labels = pred.select(['prediction','forgery'])
# Hacemos un casting de entero a float para la columna forgery
preds_and_labels = preds_and_labels.withColumn("forgery", preds_and_labels["forgery"].cast('float'))
# Pasamos el resultado anterior a un objeto RDD (Resilient Distributed Datasets) de tuplas
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

# Mostramos la matriz de confusión
print(metrics.confusionMatrix().toArray())

[[224.   1.]
 [  3. 176.]]


## Redes neuronales

Para más información sobre la implementación de PySpark de las redes neuronales a través del **Multilayer Perceptron** puede consultar la [documentación de la API](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.classification.MultilayerPerceptronClassifier.html).

Si se quisiera usar otro tipo de red neuronal más compleja habría que salir de Spark y buscar paquetes específicos. El uno que se va  aajustar es el número de capas y el número de neuronas por capa. Tener en cuenta que las redes con sólo una capa oculta se consideran clasificadores universales ya que serían capacesde aproximar cualquier función.

En este caso no se ha realizado ajuste de hiperparámetros porque es mucho más complejo y se escapa a los contenidos del curso. Solo se ha ajustado a ojo el número de capas y las neuronas por capa. En muchas ocasiones se prefiere usar algoritmos más fáciles de ajustar que las redes neuronales.

**Entrenamiento del modelo.**

In [None]:
# Entrenar un Multilayer Perceptron (un tipo de red neuronal feedforward)
from pyspark.ml.classification import MultilayerPerceptronClassifier

# Especificamos las capas de neuronas de la red neuronal
# Capa de entrada de tamaño 4 (las features), dos capas ocultas de tamaño
# 10, y finalmente una capa de salida de 2 (las clases)
layers = [4, 10, 10, 2]

mlp = MultilayerPerceptronClassifier(labelCol="forgery", featuresCol="standardized",
                                     maxIter=100, layers=layers, seed=10)

mlp_model = mlp.fit(train_df)
pred = mlp_model.transform(test_df)

# Mostramos las clases reales junto con las predicciones realizadas
pred.select("forgery", "features", "prediction").show(10)

+-------+--------------------+----------+
|forgery|            features|prediction|
+-------+--------------------+----------+
|      1|[-6.7526,8.8172,-...|       1.0|
|      1|[-6.3979,6.4479,1...|       1.0|
|      1|[-6.3679,8.0102,0...|       1.0|
|      1|[-6.2003,8.6806,0...|       1.0|
|      1|[-6.1632,8.7096,-...|       1.0|
|      1|[-5.8818,7.6584,0...|       1.0|
|      1|[-5.873,9.1752,-0...|       1.0|
|      1|[-5.4414,7.2363,0...|       1.0|
|      1|[-5.2406,6.6258,-...|       1.0|
|      1|[-5.0676,-5.1877,...|       1.0|
+-------+--------------------+----------+
only showing top 10 rows



**Evaluación del modelo.**

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="forgery", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(pred)
print("Accuracy: {}".format(accuracy))

Accuracy: 1.0


Obtenemos ahora la matriz de confusión.

In [None]:
from pyspark.mllib.evaluation import MulticlassMetrics

# Selecionamos solo las prediciones y los valores reales (forgery)
preds_and_labels = pred.select(['prediction','forgery'])
# Hacemos un casting de entero a float para la columna forgery
preds_and_labels = preds_and_labels.withColumn("forgery", preds_and_labels["forgery"].cast('float'))
# Pasamos el resultado anterior a un objeto RDD (Resilient Distributed Datasets) de tuplas
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))

# Mostramos la matriz de confusión
print(metrics.confusionMatrix().toArray())

[[225.   0.]
 [  0. 179.]]
