# Programación Big Data.

## Autor: Victor Simo Lozano

## Actividad 8
<p>Spark MLib.</p>
    

<hr style="border-color:red">

<div style="font-size:14px; text-align:justify"><b>PRIMERA PARTE.-</b><br></div>

Realizar predicción de Iris Dataset usando **Spark MLib**

<div style="font-size:14px; text-align:justify"><b>1: </b>Importación de librería.</div>

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder, OneHotEncoderModel
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pandas as pd

from pyspark.sql import SparkSession
import pyspark.sql.functions as sql

<div style="font-size:14px; text-align:justify"><b>2: </b>Crear la sesión Spark y lectura del set de datos.</div>

In [2]:
# Nombre de la sesión 'irisDataset'
spark = SparkSession.builder.appName('irisDataset').getOrCreate()
# Cargamos el set de datos del archivo csv
df = spark.read.csv('iris.csv', header = True).cache()
# Mostrar dataframe
df.show(10)

22/07/14 08:10:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
|         5.4|        3.9|         1.7|        0.4| setosa|
|         4.6|        3.4|         1.4|        0.3| setosa|
|         5.0|        3.4|         1.5|        0.2| setosa|
|         4.4|        2.9|         1.4|        0.2| setosa|
|         4.9|        3.1|         1.5|        0.1| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 10 rows



                                                                                

<div style="font-size:14px; text-align:justify"><b>3: </b>Análisis del set de datos.</div>

In [3]:
# Numero de datos del dataframe
df.count()

153

In [4]:
# Esquema de los datos
df.printSchema()

root
 |-- sepal_length: string (nullable = true)
 |-- sepal_width: string (nullable = true)
 |-- petal_length: string (nullable = true)
 |-- petal_width: string (nullable = true)
 |-- species: string (nullable = true)



Apreciamos que para las cuatro primeras columnas, como tipo de dato tenemos **"string"** y observamos que debería ser **"float"**

In [5]:
# Casting de los datos string a float mediante estructura sql
df_casted = df.select(sql.col('sepal_length').cast('float'),
                    sql.col('sepal_width').cast('float'),
                    sql.col('petal_length').cast('float'),
                    sql.col('petal_width').cast('float')
                   )

df_casted.show(5)
df_casted.dtypes

+------------+-----------+------------+-----------+
|sepal_length|sepal_width|petal_length|petal_width|
+------------+-----------+------------+-----------+
|         5.1|        3.5|         1.4|        0.2|
|         4.9|        3.0|         1.4|        0.2|
|         4.7|        3.2|         1.3|        0.2|
|         4.6|        3.1|         1.5|        0.2|
|         5.0|        3.6|         1.4|        0.2|
+------------+-----------+------------+-----------+
only showing top 5 rows



[('sepal_length', 'float'),
 ('sepal_width', 'float'),
 ('petal_length', 'float'),
 ('petal_width', 'float')]

**Nota:** Se puede observar que los datos se ha convertido a *float* como se ha programado. Pero el dataframe solo tiene las columnas escogidas en sentencia *sql*. Es decir, se ha de escoger todas las columnas que se desea en el nuevo dataframe y aplicar cast a aquellas que se vea necesario.

In [6]:
df = df.select(sql.col('sepal_length').cast('float'),
               sql.col('sepal_width').cast('float'),
               sql.col('petal_length').cast('float'),
               sql.col('petal_width').cast('float'),
               sql.col('species')
              )

df.show(5)
df.dtypes

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|         5.1|        3.5|         1.4|        0.2| setosa|
|         4.9|        3.0|         1.4|        0.2| setosa|
|         4.7|        3.2|         1.3|        0.2| setosa|
|         4.6|        3.1|         1.5|        0.2| setosa|
|         5.0|        3.6|         1.4|        0.2| setosa|
+------------+-----------+------------+-----------+-------+
only showing top 5 rows



[('sepal_length', 'float'),
 ('sepal_width', 'float'),
 ('petal_length', 'float'),
 ('petal_width', 'float'),
 ('species', 'string')]

<div style="font-size:14px; text-align:justify"><b>4: </b>Cribado de valores.</div>

Busqueda de valores nulos para las columnas del dataset

In [7]:
df.select([sql.count(sql.when(sql.isnull(c), c)).alias(c) for c in df.columns]).show()

+------------+-----------+------------+-----------+-------+
|sepal_length|sepal_width|petal_length|petal_width|species|
+------------+-----------+------------+-----------+-------+
|           0|          0|           0|          0|      0|
+------------+-----------+------------+-----------+-------+



Como resultado se ve que es un dataset con todos los datos válidos.

<div style="font-size:14px; text-align:justify"><b>5: </b>Feature Engineering.</div>

En el ML, existe una serie de procesos que se va siguiendo hasta llegar al último estado de nuestros estudios. Estos pasos se va siguiendo de forma estructurada, uno tras otro.<br>
El concepto de **Pipeline** viene a trabajar con esta secuencia de pasos en nuestro dataset, cuya metodología de trabajo es aplicar los cambios a nuestro dataframe hasta llegar a ese último estado que deseamos.

**Fuente:** <a href="https://medium.com/@nutanbhogendrasharma/role-of-stringindexer-and-pipelines-in-pyspark-ml-feature-b79085bb8a6c">Role of stringindexer and pipelines in pyspark ml feature</a>

Trabajar con los valores indice para las *species*. Mediante el uso de ***StringIndexer*** se añade un índice numérico para cada entrada de especie que haya en la columna de specie.

In [8]:
# Valores de especie para el dataframe
[i.species for i in df.select('species').distinct().collect()]

                                                                                

['virginica', 'versicolor', 'setosa']

Tenemos tres tipos de especie, por tanto, obtendremos tres indices.

In [9]:
# Aplicar indexer
species_indexer = StringIndexer(inputCol='species', outputCol='species_index')

In [10]:
# Seleccionar las columnas de especie y eliminar valores repetidos.
df_sample = species_indexer.fit(df).transform(df)
df_sample = df_sample.select(sql.col('species'),
                               sql.col('species_index')
                              )

df_sample.dropDuplicates().show()

# df_species.dropDuplicates(['species','species_indexer']).show()



+----------+-------------+
|   species|species_index|
+----------+-------------+
|    setosa|          0.0|
| virginica|          2.0|
|versicolor|          1.0|
+----------+-------------+



                                                                                

Como se ve, se ha creado tantos indices como valroes de especie únicos hay. De este modo, todos los valores de *setosa* tendrán un index 0.0, *virginica* de 2.0 y *versicolor* de 1.0.

***OneHotEncoder*** permite generar un vector binario a partir de datos categoricos. <br>

Como es el caso de *irisDataset*, tenemos tres tipos de especies, por lo que para poder realizar algunas predicciones, necesitamos poder evaluarlo en base a datos binarios. Por tanto, requerimos de este paso. <br>

Como paso previo, se necesita hacer uso de *StringIndexer* ya que no se puede aplicar a columnas de string *OneHotEncoder*.

In [11]:
# Aplicar vectorizado binario
species_vectorize = OneHotEncoder(inputCol=species_indexer.getOutputCol(), outputCol='species_vector')

In [12]:
# Prueba para obtener columnas separadas de species y despues juntarlas en una con VectorAssembler
# Convertir a pandas
# df_pandas = df.toPandas()
# df_pandas = pd.get_dummies(df_pandas, drop_first=False)
# display(df_pandas.head())

# # Convertir a spark
# df_spark = spark.createDataFrame(df_pandas)
# df_spark.show(5)

***VectorAssembler*** es una funcion que nos permite agrupar en un único vector características de una serie de columnas de nuestro set de datos, muy útil para entrenar modelos de ML de *logistic regresion* o *decision trees*.<br>

Dicho lo cual, ahora se va a combinar los valores numéricos para las columnas de anchura y largura de petalo y sepalo.

In [13]:
# Columnas a agrupar en vector
input_cols = ['sepal_length', 'sepal_width', 'petal_length', 'petal_width']
# Aplicar vectorizado
values_vectorize = VectorAssembler(inputCols=input_cols, outputCol='values_vector')

Finalmente, crear nuestro modelo haciendo uso de **Pipeline**.

In [14]:
# Crear el dataframe con los Feature Engineering desarrollados
pipeline = Pipeline(stages=[species_indexer, species_vectorize, values_vectorize])
df_transformed = pipeline.fit(df).transform(df)
df_transformed.show(5)

+------------+-----------+------------+-----------+-------+-------------+--------------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|species|species_index|species_vector|       values_vector|
+------------+-----------+------------+-----------+-------+-------------+--------------+--------------------+
|         5.1|        3.5|         1.4|        0.2| setosa|          0.0| (2,[0],[1.0])|[5.09999990463256...|
|         4.9|        3.0|         1.4|        0.2| setosa|          0.0| (2,[0],[1.0])|[4.90000009536743...|
|         4.7|        3.2|         1.3|        0.2| setosa|          0.0| (2,[0],[1.0])|[4.69999980926513...|
|         4.6|        3.1|         1.5|        0.2| setosa|          0.0| (2,[0],[1.0])|[4.59999990463256...|
|         5.0|        3.6|         1.4|        0.2| setosa|          0.0| (2,[0],[1.0])|[5.0,3.5999999046...|
+------------+-----------+------------+-----------+-------+-------------+--------------+--------------------+
only showi

In [15]:
# Desordenar los valores ya que vienen ordenados en el fichero .csv
df_transformed = df_transformed.orderBy(sql.rand())
df_transformed.show(5)

+------------+-----------+------------+-----------+----------+-------------+--------------+--------------------+
|sepal_length|sepal_width|petal_length|petal_width|   species|species_index|species_vector|       values_vector|
+------------+-----------+------------+-----------+----------+-------------+--------------+--------------------+
|         5.6|        3.0|         4.1|        1.3|versicolor|          1.0| (2,[1],[1.0])|[5.59999990463256...|
|         5.7|        2.6|         3.5|        1.0|versicolor|          1.0| (2,[1],[1.0])|[5.69999980926513...|
|         6.5|        3.2|         5.1|        2.0| virginica|          2.0|     (2,[],[])|[6.5,3.2000000476...|
|         4.6|        3.6|         1.0|        0.2|    setosa|          0.0| (2,[0],[1.0])|[4.59999990463256...|
|         4.9|        3.1|         1.5|        0.1|    setosa|          0.0| (2,[0],[1.0])|[4.90000009536743...|
+------------+-----------+------------+-----------+----------+-------------+--------------+-----

In [16]:
df_transformed.printSchema()

root
 |-- sepal_length: float (nullable = true)
 |-- sepal_width: float (nullable = true)
 |-- petal_length: float (nullable = true)
 |-- petal_width: float (nullable = true)
 |-- species: string (nullable = true)
 |-- species_index: double (nullable = false)
 |-- species_vector: vector (nullable = true)
 |-- values_vector: vector (nullable = true)



***randomSplit*** nos permite separar el set de datos en un conjunto para entrenar nuestro modelo y un segundo para realizar la evaluación.

In [17]:
# Separar el set de datos en datos de entrenamiento y test
(train_df, test_df) = df_transformed.randomSplit([0.80,0.2])

***Decision Tree Classifier***

In [18]:
DTC = DecisionTreeClassifier(labelCol='species_index', featuresCol='values_vector', maxDepth=5)

In [19]:
pipeline = Pipeline(stages=[DTC])
model_DTC = pipeline.fit(train_df)

# Prediccion
pred_DTC= model_DTC.transform(test_df)
pred_DTC.printSchema()
pred_DTC.select(sql.col('sepal_length'),sql.col('sepal_width'),
               sql.col('petal_length'),sql.col('petal_width'),
               sql.col('species'),sql.col('prediction')).show()

# Precision
evaluator_DTC = MulticlassClassificationEvaluator(labelCol='species_index', predictionCol='prediction', metricName='accuracy')
accuracy_DTC = evaluator_DTC.evaluate(pred_DTC)
display('Precisión Decission Tree Classifier = ', accuracy_DTC)


root
 |-- sepal_length: float (nullable = true)
 |-- sepal_width: float (nullable = true)
 |-- petal_length: float (nullable = true)
 |-- petal_width: float (nullable = true)
 |-- species: string (nullable = true)
 |-- species_index: double (nullable = false)
 |-- species_vector: vector (nullable = true)
 |-- values_vector: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

+------------+-----------+------------+-----------+----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|prediction|
+------------+-----------+------------+-----------+----------+----------+
|         4.0|        0.9|        10.5|        0.4|    setosa|       2.0|
|         4.4|        3.2|         1.3|        0.2|    setosa|       0.0|
|         4.5|        2.3|         1.3|        0.3|    setosa|       0.0|
|         4.7|        3.2|         1.6|        0.2|    setosa|       0.0|

'Precisión Decission Tree Classifier = '

0.9032258064516129

***Gradient-boosted tree classiffier***

In [20]:
GBC = GBTClassifier(labelCol='species_vector', featuresCol='values_vector')

In [21]:
pipeline = Pipeline(stages=[GBC])
model_GBC = pipeline.fit(train_df)

# Prediccion
pred_GBC= model_DTC.transform(test_df)
pred_GBC.printSchema()
pred_GBC.select(sql.col('sepal_length'),sql.col('sepal_width'),
               sql.col('petal_length'),sql.col('petal_width'),
               sql.col('species'),sql.col('prediction')).show()

# Precision
evaluator = MulticlassClassificationEvaluator(labelCol='species_vector', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(pred_GBC)
display('Precisión Decission Tree Classifier = ', accuracy)


IllegalArgumentException: requirement failed: Column species_vector must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.

Para la ejecución de ***Gradient-boosted tree classiffier*** se obtiene un error en el formato de la columna de label. Este algoritmo solo permite predicciones binarias, por lo que para conseguirlo se ha realizado **OneHotEncoder** como se ha explicado, pero el formato de columna no es aceptado. <br>

Visto esto, se ha probado a realizar una segregación de la columa *species* con **get_dummies** pasando por pandas y más adelante mediante **VectorAssembler** obtener una columa. Tras realizar el nuevo pipeline, a la hora de ejecutar el algoritmo, se obtiene el mismo error de tipo de columna.

***Random Forest Classiffier***

In [22]:
RFC = RandomForestClassifier(labelCol='species_index', featuresCol='values_vector')

In [23]:
pipeline = Pipeline(stages=[RFC])
model_RFC = pipeline.fit(train_df)

# Prediccion
pred_RFC= model_DTC.transform(test_df)
# pred_RFC.printSchema()
pred_RFC.select(sql.col('sepal_length'),sql.col('sepal_width'),
               sql.col('petal_length'),sql.col('petal_width'),
               sql.col('species'),sql.col('prediction')).show()

# Precision
evaluator_RFC = MulticlassClassificationEvaluator(labelCol='species_index', predictionCol='prediction', metricName='accuracy')
accuracy_RFC= evaluator_RFC.evaluate(pred_RFC)
display('Precisión Random Forest Classifier = ', accuracy_RFC)

+------------+-----------+------------+-----------+----------+----------+
|sepal_length|sepal_width|petal_length|petal_width|   species|prediction|
+------------+-----------+------------+-----------+----------+----------+
|         4.0|        0.9|        10.5|        0.4|    setosa|       2.0|
|         4.4|        3.2|         1.3|        0.2|    setosa|       0.0|
|         4.5|        2.3|         1.3|        0.3|    setosa|       0.0|
|         4.7|        3.2|         1.6|        0.2|    setosa|       0.0|
|         4.9|        2.5|         4.5|        1.7| virginica|       1.0|
|         4.9|        3.0|         1.4|        0.2|    setosa|       0.0|
|         4.9|        3.1|         1.5|        0.1|    setosa|       0.0|
|         4.9|        3.1|         1.5|        0.1|    setosa|       0.0|
|         5.1|        3.5|         1.4|        0.3|    setosa|       0.0|
|         5.4|        3.7|         1.5|        0.2|    setosa|       0.0|
|         5.5|        2.6|         4.4

'Precisión Random Forest Classifier = '

0.9032258064516129