#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)

#Practica sobre cómo generar un flujo de ejecución en un problema de Machine Learning

Esta práctica simula un ejercicio completo de ETL (Extract-Transform-Load) junto a un análisis exploratorio de un dataset real, para posteriormente aplicar differentes algoritmos de aprendizaje automático que resuelvan un problema de regresión.

** This notebook covers: **
* *Parte 1: Conocimiento del dominio*
* *Parte 2: Extracción, transformación y carga [ETL] del dataset* 
* *Parte 3: Explorar los datos* 
* *Parte 4: Visualizar los datos* 
* *Parte 5: Preparar los datos* 
* *Parte 6: Modelar los datos* 
* *Parte 7: Ajustar y evaluar*
* *Parte 8: Propuesta de modelo altenativo* 

*Nuestro objetivo será predecir de la forma más exacta posible la energía generada por un conjunto de plantas eléctricas usando los datos generados por un conjunto de sensores.*


## Parte 1: Conocimiento del dominio

** Background **

La generación de energía es un proceso complejo, comprenderlo para poder predecir la potencia de salida es un elemento vital en la gestión de una planta energética y su conexión a la red. Los operadores de una red eléctrica regional crean predicciones de la demanda de energía en base a la información histórica y los factores ambientales (por ejemplo, la temperatura). Luego comparan las predicciones con los recursos disponibles (por ejemplo, plantas, carbón, gas natural, nuclear, solar, eólica, hidráulica, etc). Las tecnologías de generación de energía, como la solar o la eólica, dependen en gran medida de las condiciones ambientales, pero todas las centrales eléctricas son objeto de mantenimientos tanto planificados y como puntuales debidos a un problema.

En esta practica usaremos un ejemplo del mundo real sobre la demanda prevista (en dos escalas de tiempo), la demanda real, y los recursos disponibles de la red electrica de California: http://www.caiso.com/Pages/TodaysOutlook.aspx

![](http://content.caiso.com/outlook/SP/ems_small.gif)

El reto para un operador de red de energía es cómo manejar un déficit de recursos disponibles frente a la demanda real. Hay tres posibles soluciones a un déficit de energía: construir más plantas de energía base (este proceso puede costar muchos anos de planificación y construcción), comprar e importar de otras redes eléctricas regionales energía sobrante (esta opción puede ser muy cara y está limitado por las interconexiones entre las redes de transmisión de energía y el exceso de potencia disponible de otras redes), o activar pequeñas [plantas de pico](https://en.wikipedia.org/wiki/Peaking_power_plant). Debido a que los operadores de red necesitan responder con rapidez a un déficit de energía para evitar un corte del suministro, estos basan sus decisiones en una combinación de las dos últimas opciones. En esta práctica, nos centraremos en la última elección.

** La lógica de negocio **

Debido a que la demanda de energía solo supera a la oferta ocasionalmente, la potencia suministrada por una planta de energía pico tiene un precio mucho más alto por kilovatio hora que la energía generada por las centrales eléctricas base de una red eléctrica. Una planta pico puede operar muchas horas al día, o solo unas pocas horas al año, dependiendo de la condición de la red eléctrica de la región. Debido al alto coste de la construcción de una planta de energía eficiente, si una planta pico solo va a funcionar por un tiempo corto o muy variable, no tiene sentido económico para que sea tan eficiente como una planta de energía base. Además, el equipo y los combustibles utilizados en las plantas base a menudo no son adecuados para uso en plantas de pico.

La salida de potencia de una central eléctrica pico varía dependiendo de las condiciones ambientales, por lo que el problema de negocio a resolver se podría describir como _predecir la salida de potencia de una central eléctrica pico en función de la condiciones ambientales_  - ya que esto permitiría al operador de la red hacer compensaciones económicas sobre el número de plantas pico que ha de conectar en cada momento (o si por el contrario le interesa comprar energía más cara de otra red).

Una vez descrita esta lógica de negocio, primero debemos proceder a realizar un análisis exploratorio previo y trasladar el problema de negocio (predecir la potencia de salida en función de las condiciones medio ambientales) en un tarea de aprendizaje automático (ML). Por ejemplo, una tarea de ML que podríamos aplicar a este problema es la regresión, ya que tenemos un variable objetivo (dependiente) que es numérica. Para esto usaremos [Apache Spark ML Pipeline](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark-ml-package) para calcular dicha regresión.

Los datos del mundo real que usaremos en esta práctica se componen de 9.568 puntos de datos, cada uno con 4 atributos ambientales recogidos en una Central de Ciclo Combinado de más de 6 años (2006-2011), proporcionado por la Universidad de California, Irvine en [UCI Machine Learning Repository Combined Cycle Power Plant Data Set](https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant)). Para más detalles sobre el conjunto de datos visitar la página de la UCI, o las siguientes referencias:

* Pinar Tufekci, [Prediction of full load electrical power output of a base load operated combined cycle power plant using machine learning methods](http://www.journals.elsevier.com/international-journal-of-electrical-power-and-energy-systems/), International Journal of Electrical Power & Energy Systems, Volume 60, September 2014, Pages 126-140, ISSN 0142-0615.
* Heysem Kaya, Pinar Tufekci and Fikret S. Gurgen: [Local and Global Learning Methods for Predicting Power of a Combined Gas & Steam Turbine](http://www.cmpe.boun.edu.tr/~kaya/kaya2012gasturbine.pdf), Proceedings of the International Conference on Emerging Trends in Computer and Electronics Engineering ICETCEE 2012, pp. 13-18 (Mar. 2012, Dubai).

**Tarea a realizar durante la primera parte:**

Revisar la documentacion y referencias de:
* [Spark Machine Learning Pipeline](https://spark.apache.org/docs/latest/ml-guide.html#main-concepts-in-pipelines).
* [Databricks File System](https://docs.databricks.com/user-guide/dbutils.html).

## Parte 2: Extracción, transformación y carga [ETL] del dataset


Ahora que entendemos lo que estamos tratando de hacer, el primer paso consiste en cargar los datos en un formato que podemos consultar y utilizar fácilmente. Esto se conoce como ETL o "extracción, transformación y carga". Primero, vamos a cargar nuestro archivo de Amazon S3.

Nota: Como alternativa podemos subir nuestros datos utilizando "Databricks Menu> Tablas> Crear tabla", suponiendo que tengamos los archivos sin procesar en nuestro ordenador local.

Nuestros datos están disponibles en Amazon S3 en la siguiente ruta:

```
dbfs:/databricks-datasets/power-plant/data
```

==========================================================================================================================================================================================================


Empezaremos por visualizar una muestra de los datos. Para esto usaremos las funciones pre-definidas en los notebooks de Databricks para explorar su sistema de archivos. Estas utilidades son las llamadas `dbutils` para trabajar en el llamado Databricks File System. La función `dbutils.fs.ls` permite listar los contenidos de un directorio. 

`dbutils.fs` dispone de su propio help, esta ayuda nos será de gran utilidad cuando deseemos ver las diferentes funciones disponibles.

In [0]:
dbutils.fs.help()

Usar la función `display` y la función `dbutils.fs.ls` de Databricks para listar los ficheros del directorio S3 en el que se encuentran nuestros datos que usaremos en esta PEC.

In [0]:
#TODO: use display to list all the files of the directory containing the data
display(dbutils.fs.ls("/databricks-datasets/power-plant/data"))

path,name,size
dbfs:/databricks-datasets/power-plant/data/Sheet1.tsv,Sheet1.tsv,308693
dbfs:/databricks-datasets/power-plant/data/Sheet2.tsv,Sheet2.tsv,308693
dbfs:/databricks-datasets/power-plant/data/Sheet3.tsv,Sheet3.tsv,308693
dbfs:/databricks-datasets/power-plant/data/Sheet4.tsv,Sheet4.tsv,308693
dbfs:/databricks-datasets/power-plant/data/Sheet5.tsv,Sheet5.tsv,308693


Ahora, usaremos el comando `dbutils.fs.head` y la función `print` para ver los primeros 65,536 bytes del primer archivo del directorio: `Sheet1.tsv`

In [0]:
#TODO: print the first 65,536 bytes of the file Sheet1.tsv
print (dbutils.fs.head("/databricks-datasets/power-plant/data/Sheet1.tsv",65536))

==========================================================================================================================================================================================================

Ahora usaremos PySpark para visualizar las 5 primeras líneas de los datos

*Hint*: Primero crea un RDD a partir de los datos usando [`sc.textFile("dbfs:/databricks-datasets/power-plant/data")`](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.SparkContext.textFile).

*Hint*: Luego piensa como usar el RDD creado para mostrar datos, el método [`take()`](https://spark.apache.org/docs/1.6.2/api/python/pyspark.html#pyspark.RDD.take) puede ser una buena opción a considerar.

In [0]:
# TODO: Load the data and print the first five lines.
rawTextRdd = sc.textFile("dbfs:/databricks-datasets/power-plant/data")
rawTextRdd.take(5)


A partir nuestra exploración inicial de una muestra de los datos, podemos hacer varias observaciones sobre el proceso de ETL:
- Los datos son un conjunto de .tsv (archivos con valores separados Tab) (es decir, cada fila de datos se separa mediante tabuladores)
- Hay una fila de cabecera, que es el nombre de las columnas
- Parece que el tipo de los datos en cada columna es constante (es decir, cada columna es de tipo double)

El esquema de datos que hemos obtenido de UCI es:
- AT = Atmospheric Temperature in C
- V = Exhaust Vacuum Speed
- AP = Atmospheric Pressure
- RH = Relative Humidity
- PE = Power Output.  Esta es la variable dependiente que queremos predecir usando los otras cuatro

Para usar el paquete Spark CSV [spark-csv](https://spark-packages.org/package/databricks/spark-csv), usaremos el método [sqlContext.read.format()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.format) para especificar el formato de la fuente de datos de entrada: `'com.databricks.spark.csv'`

Podemos especificar diferentes opciones de como importar los datos usando el método [options()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.options). Encontramos las opciones disponible en la documentación de GitHub del paquete [aquí](https://github.com/databricks/spark-csv#features).

Usaremos las siguientes opciones:
- `delimiter='\t'` porque nuestros datos se encuentran delimitados por tabulaciones
- `header='true'` porque nuestro dataset tiene una fila que representa la cabecera de los datos
- `inferschema='true'` porque creemos que todos los datos son números reales, por lo tanto la librería puede inferir el tipo de cada columna de forma automática.

El ultimo componente necesario para crear un DataFrame es determinar la ubicación de los datos usando el método [load()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.load): `"/databricks-datasets/power-plant/data"`

Juntando todo, usaremos la siguiente operación:

`sqlContext.read.format().options().load()`

Crear un DataFrame a partir de los datos.
- El formato es csv

En el campo opciones incluiremos 3, formadas por nombre de opción y valor, separadas por coma.
- El separador es el tabulador
- El fichero contiene cabecera 'header'
- Para crear un dataframe necesitamos un esquema (schema). A partir de los datos Spark puede tratar de inferir el esquema, le diremos 'true'.

NOTA: [Ayuda] https://docs.databricks.com/spark/latest/data-sources/read-csv.html

El directorio a cargar es el especificado anteriormente. Es importante indicarle a Spark que es una ubicación ya montada en el sistema dbfs, como se ha mostrado en el ejercicio 2a.

In [0]:


powerPlantDF = sqlContext.read.format("com.databricks.spark.csv").options(delimiter='\t', header='true', inferschema='true' ).load("/databricks-datasets/power-plant/data")

Vamos a comprobar los tipos de las columnas usando el metodo [dtypes](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dtypes).

In [0]:
powerPlantDF.printSchema()

Tambien podemos examinar los datos usando el metodo `display()`.

In [0]:
powerPlantDF.show()

Ahora en lugar de usar [spark-csv](https://spark-packages.org/package/databricks/spark-csv) para inferir los tipos de las columnas, especificaremos el esquema como [DataType](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DataType), el cual es una lista de [StructField](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.StructType).

La lista completa de tipos se encuentra en el modulo [pyspark.sql.types](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.types). Para nuestros datos, usaremos [DoubleType()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.types.DoubleType).

Por ejemplo, para especificar cual es el nombre de la columna usaremos: `StructField(`_name_`,` _type_`, True)`. (El tercer parámetro, `True`, significa que permitimos que la columna tenga valores null.)

Crea un esquema a medida para el dataset.

In [0]:
# TO DO: Fill in the custom schema.
from pyspark.sql.types import *

# Custom Schema for Power Plant
customSchema = StructType([ \
    StructField("AT",DoubleType(),True), \
    StructField("V",DoubleType(),True), \
    StructField("AP",DoubleType(),True), \
    StructField("RH",DoubleType(),True), \
    StructField("PE",DoubleType(),True) \
                          ])

Ahora, usaremos el esquema que acabamos de crear para leer los datos. Para realizar esta operación, modificaremos el paso anterior `sqlContext.read.format`. Podemos especificar el esquema haciendo:
- Anadir `schema = customSchema` al método load (simplemente anadelo usando una coma justo después del nombre del archivo)
- Eliminado la opción `inferschema='true'` ya que ahora especificamos el esquema que han de seguir los datos

In [0]:
# TODO: Use the schema you created above to load the data again.
altPowerPlantDF = sqlContext.read.format("com.databricks.spark.csv").options(delimiter='\t', header='true', inferschema='true').load("/databricks-datasets/power-plant/data",schema = customSchema)

Es importante darse cuenta que esta vez no se ha ejecutado ningún job de Spark. Esto se debe a que hemos especificado el esquema, por tanto el paquete [spark-csv](https://spark-packages.org/package/databricks/spark-csv) no tiene por qué leer los datos para inferir el esquema. Podemos usar el método [dtypes](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.dtypes) para examinar el nombre y el tipo de los atributos del dataset. Estos deberían ser idénticos a los que hemos inferido anteriormente de los datos.

Cuando ejecutes la siguiente celda, los datos no deberían leerse.

In [0]:
print altPowerPlantDF.dtypes

Ahora podemos examinar los datos utilizando el método display(). * Ten en cuenta que esta operación hará que los datos que se lean y se creara el DataFrame. *

In [0]:
display(altPowerPlantDF)

AT,V,AP,RH,PE
14.96,41.76,1024.07,73.17,463.26
25.18,62.96,1020.04,59.08,444.37
5.11,39.4,1012.16,92.14,488.56
20.86,57.32,1010.24,76.64,446.48
10.82,37.5,1009.23,96.62,473.9
26.27,59.44,1012.23,58.77,443.67
15.89,43.96,1014.02,75.24,467.35
9.48,44.71,1019.12,66.43,478.42
14.64,45.0,1021.78,41.25,475.98
11.74,43.56,1015.14,70.72,477.5


## Parte 3: Explorar tus Datos

Ahora que ya hemos cargado los datos, el siguiente paso es explorarlos y realizar algunos análisis y visualizaciones básicas.

Este es un paso que siempre se debe realizar **antes de** intentar ajustar un modelo a los datos, ya que este paso muchas veces nos permitirá conocer una gran información sobre los datos.

En primer lugar vamos a registrar nuestro DataFrame como una tabla de SQL llamado `power_plant`. Debido a que es posible que repitas esta práctica varias veces, vamos a tomar la precaución de eliminar cualquier tabla existente en primer lugar.

Podemos eliminar cualquier tabla SQL existente `power_plant` usando el comando SQL:` DROP TABLE IF EXISTS power_plant` (también debemos que eliminar todos los ficheros asociados a la tabla, lo que podemos hacer con una operación de sistema de archivos Databricks).

Una vez ejecutado el paso anterior, podemos registrar nuestro DataFrame como una tabla de SQL usando [sqlContext.registerDataFrameAsTable()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.registerDataFrameAsTable).

In [0]:
sqlContext.sql("DROP TABLE IF EXISTS power_plant")
dbutils.fs.rm("dbfs:/user/hive/warehouse/power_plant", True)
sqlContext.registerDataFrameAsTable(powerPlantDF, "power_plant")

Ahora que nuestro DataFrame existe como una tabla SQL, podemos explorarlo utilizando comandos SQL.

Para ejecutar SQL en una celda, utilizamos el operador `%sql`. La celda siguiente es un ejemplo del uso de SQL para consultar las filas de la tabla de SQL.

**NOTE**: `%sql` es una sentencia que solo funciona en los notebooks de Databricksis. Este ejecuta `sqlContext.sql()` y pasa los resultados a la función `display()`. Estas dos sentencias son equivalentes:

`%sql SELECT * FROM power_plant`

`display(sqlContext.sql("SELECT * FROM power_plant"))`

In [0]:
sqlContext.sql("SELECT * FROM power_plant").show()

In [0]:
%sql
-- We can use %sql to query the rows
SELECT * FROM power_plant

AT,V,AP,RH,PE
14.96,41.76,1024.07,73.17,463.26
25.18,62.96,1020.04,59.08,444.37
5.11,39.4,1012.16,92.14,488.56
20.86,57.32,1010.24,76.64,446.48
10.82,37.5,1009.23,96.62,473.9
26.27,59.44,1012.23,58.77,443.67
15.89,43.96,1014.02,75.24,467.35
9.48,44.71,1019.12,66.43,478.42
14.64,45.0,1021.78,41.25,475.98
11.74,43.56,1015.14,70.72,477.5


Usa el comando de SQL `desc` para describir el esquema ejecutando la siguiente celda.

In [0]:
%sql
desc power_plant

col_name,data_type,comment
AT,double,
V,double,
AP,double,
RH,double,
PE,double,


**Definición de Esquema**

Una vez más, nuestro esquema es el siguiente:

- AT = Atmospheric Temperature in C
- V = Exhaust Vacuum Speed
- AP = Atmospheric Pressure
- RH = Relative Humidity
- PE = Power Output

PE es nuestra variable objetivo. Este es el valor que intentamos predecir usando las otras mediciones.

*Referencia [UCI Machine Learning Repository Combined Cycle Power Plant Data Set](https://archive.ics.uci.edu/ml/datasets/Combined+Cycle+Power+Plant)*

Podemos obtener el DataFrame asociado a una tabla SQL usando el método [sqlContext.table()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.table) pasando como argumento el nombre de la tabla SQL.

Ahora vamos a realizar un análisis estadístico básico de todas las columnas.

Calculad y mostrad los resultados en modo tabla (la función `display` os puede ser de ayuda):
* Número de registros en nuestros datos
* Media de cada columna
* Máximo y mínimo de cada columna
* Desviación estándar de cada columna

Hint: Revisad [DataFrame](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame) ya que contiene métodos que permiten realizar dichos cálculos de manera sencilla.

In [0]:
powerPlantDF.describe().show()

In [0]:
# Visualización de datos estadísticos básicos de nuestro dataset
df = sqlContext.table("power_plant")
display(df.summary("count", "mean", "max", "min", "stddev"))

summary,AT,V,AP,RH,PE
count,47840.0,47840.0,47840.0,47840.0,47840.0
mean,19.651231187291,54.30580372073594,1013.2590781772572,73.30897784280918,454.3650094063551
max,37.11,81.56,1033.3,100.16,495.76
min,1.81,25.36,992.89,25.56,420.26
stddev,7.452161658340004,12.707361709685806,5.938535418520816,14.599658352081477,17.06628146683769


## <font color="blue">Parte 4: Visualizar los datos</font>

Para entender nuestros datos, intentamos buscar correlaciones entre las diferentes características y sus correspondientes etiquetas. Esto puede ser importante cuando seleccionamos un modelo. Por ejemplo, si una etiqueta y sus características se correlacionan de forma lineal, un modelo de regresión lineal obtendrá un buen rendimiento; por el contrario si la relación es no lineal, modelos más complejos, como arboles de decisión pueden ser una mejor opción. Podemos utilizar herramientas de visualización para observar cada uno de los posibles predictores en relación con la etiqueta como un gráfico de dispersión para ver la correlación entre ellos.

==========================================================================================================================================================================================================

** Añade las siguientes figuras: **
Vamos a ver si hay una correlación entre la temperatura y la potencia de salida. Podemos utilizar una consulta SQL para crear una nueva tabla que contenga solo el de temperatura (AT) y potencia (PE), y luego usar un gráfico de dispersión con la temperatura en el eje X y la potencia en el eje Y para visualizar la relación (si la hay) entre la temperatura y la energía.

Realiza los siguientes pasos:

- Ejecuta la siguiente celda
- Haz clic en el menú desplegable junto al icono de "Bar Chart" y selecciona "Scatter" para convertir la tabla en un gráfico de dispersión
- Haz click en "Plot Options..."
- En la caja de valores, haz clic en "Temperature" y arrástralo antes de "Power"
- Aplicar los cambios haciendo clic en el botón "Apply"
- Aumentar el tamaño del grafico haciendo clic y arrastrando el control del tamaño

In [0]:

display(sqlContext.sql("select RH,PE from power_plant"))

RH,PE
73.17,463.26
59.08,444.37
92.14,488.56
76.64,446.48
96.62,473.9
58.77,443.67
75.24,467.35
66.43,478.42
41.25,475.98
70.72,477.5


Parece que hay una gran correlación entre temperatura y power output. Esta correlación es esperable gracias a la segunda ley de la termodinamica [thermal efficiency](https://en.wikipedia.org/wiki/Thermal_efficiency). Ir más allá en este análisis queda fuera del ámbito de esta práctica.

Usa una sentencia SQL para crear un gráfico de dispersión entre las variables Power (PE) y Exhaust Vacuum Speed (V).

In [0]:
%sql

select PE,V from power_plant

PE,V
463.26,41.76
444.37,62.96
488.56,39.4
446.48,57.32
473.9,37.5
443.67,59.44
467.35,43.96
478.42,44.71
475.98,45.0
477.5,43.56


Ahora vamos a repetir este ejercicio con el resto de variables y la etiqueta Power Output.

Usa una sentencia SQL para crear un gráfico de dispersión entre las variables Power (PE) y Pressure (AP).

In [0]:
%sql
-- TO DO: Replace <FILL_IN> with the appropriate SQL command.
select AP,PE from power_plant

AP,PE
1024.07,463.26
1020.04,444.37
1012.16,488.56
1010.24,446.48
1009.23,473.9
1012.23,443.67
1014.02,467.35
1019.12,478.42
1021.78,475.98
1015.14,477.5


Usa una sentencia SQL para crear un gráfico de dispersión entre las variables Power (PE) y Humidity (RH).

In [0]:
%sql
-- TO DO: Replace <FILL_IN> with the appropriate SQL command.
select RH,PE from power_plant

RH,PE
73.17,463.26
59.08,444.37
92.14,488.56
76.64,446.48
96.62,473.9
58.77,443.67
75.24,467.35
66.43,478.42
41.25,475.98
70.72,477.5


##Parte 5: Preparación de los datos

El siguiente paso es preparar los datos para aplicar la regresión. Dado que todo el dataset es numérico y consistente, esta será una tarea sencilla y directa.

El objetivo es utilizar el método de regresión para determinar una función que nos de la potencia de salida como una función de un conjunto de características de predicción. El primer paso en la construcción de nuestra regresión es convertir las características de predicción de nuestro DataFrame a un vector de características utilizando el método [pyspark.ml.feature.VectorAssembler()](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.feature.VectorAssembler).

El VectorAssembler es una transformación que combina una lista dada de columnas en un único vector. Esta transformación es muy útil cuando queremos combinar características en crudo de los datos con otras generadas al aplicar diferentes funciones sobre los datos en un único vector de características. Para integrar en un único vector toda esta información antes de ejecutar un algoritmo de aprendizaje automático, el VectorAssembler toma una lista con los nombres de las columnas de entrada (lista de strings) y el nombre de la columna de salida (string).

==========================================================================================================================================================================================================
### Ejercicio 5

- Leer la documentación y los ejemplos de uso de [VectorAssembler](https://spark.apache.org/docs/latest/ml-features.html#vectorassembler)
- Convertir la tabla SQL `power_plant` en un `dataset` llamado datasetDF
- Establecer las columnas de entrada del VectorAssember: `["AT", "V", "AP", "RH"]`
- Establecer la columnas de salida como `"features"`

In [0]:
# TODO: Replace <FILL_IN> with the appropriate code
from pyspark.ml.feature import VectorAssembler
# PCA, 
datasetDF = sqlContext.table("power_plant")

vectorizer = VectorAssembler()
vectorizer.setInputCols(["AT", "V", "AP", "RH"])
vectorizer.setOutputCol("features")

##Parte 6: Modelar los datos

Ahora vamos a modelar nuestros datos para predecir que potencia de salida se dara cuando tenemos una serie de lecturas de los sensores

La API de [Apache Spark MLlib](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml) ofrece diferentes implementaciones de técnicas de regresion para modelar datasets. En este caso usaremos una muy popular, el llamado [Random Forest](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor). Se basa en la combinación de varios [Arboles de Decisión](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.DecisionTreeRegressor), otra técnica de regresión. En este caso, Random Forest combina una cantidad grande de árboles de decisión independientes en la fase de entrenamiento probados sobre conjuntos de datos aleatorios con igual distribución.
 El objetivo es utilizarse para adaptar un modelo predictivo a un conjunto de datos observados \\(y\\) y \\(X\\). Despues de desarrollar un modelo de este tipo, dado un cierto valor  \\( X\\) del que no conocemos su valor de \\(y \\), el modelo ajustado se puede utilizarse para hacer una prediccion del valor del posible valor \\(y \\). En este caso, queremos predecir la potencia de salida.

NOTA: Animamos a los alumnos a explorar las diferentes técnicas de regresión disponibles en la [API ML de Spark](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.classification)

Necesitamos una forma de evaluar como de bien nuestro modelo predice la produccion de potencia en funcion de parametros de entrada. Podemos hacer esto mediante la division de nuestros datos iniciales establecidos en un _Training set_ utilizado para entrenar a nuestro modelo y un _Test set_ utilizado para evaluar el rendimiento de nuestro modelo. Podemos usar el metodo nativo de los DataFrames [randomSplit()](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) para dividir nuestro dataset. El método toma una lista de pesos y una semilla aleatoria opcional. La semilla se utiliza para inicializar el generador de numeros aleatorios utilizado por la funcion de division.

==========================================================================================================================================================================================================

Utiliza el método [randomSplit()](https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.randomSplit) para dividir `datasetDF` en trainingSetDF (80% del DataFrame de entrada) y testSetDF (20% del DataFrame de entrada), para poder reproducir siempre el mismo resultado, usar la semilla 1800009193L. Finalmente, cachea (cache()) cada datafrane en memoria para maximizar el rendimiento.

In [0]:
# TODO: Replace <FILL_IN> with the appropriate code.
# We'll hold out 20% of our data for testing and leave 80% for training
seed = 1800009193
(split20DF, split80DF) = datasetDF.randomSplit([0.2,0.8],seed)

# Let's cache these datasets for performance
testSetDF = split20DF.cache()
trainingSetDF = split80DF.cache()

A continuacion vamos a crear nuestro modelo y utilizar su ayda para entender como entrenarlo. Ver la API de [Random Forest Regression](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor) para mas detalles.

Ejecuta la siguiente celda:

In [0]:
# ************ RANDOM FOREST ********************
from pyspark.ml import Pipeline
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

rf = RandomForestRegressor()

print(rf.explainParams())


La siguiente celda esta basada en [Spark ML Pipeline API for Random Forest Regression](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.RandomForestRegressor).

Ahora, crearemos el [ML Pipeline](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Pipeline) (flujo de ejecución) y estableceremos las fases del pipeline como vectorizar y posteriormente aplicar el regresor que hemos definido.

Finalmente, crearemos el modelo entrenándolo con el DataFrame `trainingSetDF`.

El primer paso es establecer los valores de los parametros:
- Define el nombre de la columna a donde guardaremos la prediccion como "Predicted_PE"
- Define el nombre de la columna que contiene la etiqueta como "PE"
- Definimos el numero de arboles de decisión utilizados
- Definimos el grado de profundidad de la estructura en árbol

In [0]:
## TODO: Replace <FILL_IN> with the appropriate code
# Now we set the parameters for the method
rf.setPredictionCol("Predicted_PE")\
  .setLabelCol("PE")\
  .setMaxDepth(5)\
  .setNumTrees(20)
  
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[vectorizer, rf])

# Train model.  This also runs the indexer.
rfmodel = pipeline.fit(trainingSetDF)

In [0]:
# DISPLAY DETALLES DEL ARBOL CREADO

print("Nodos: " + str(rfmodel.stages[-1]._java_obj.parent().getNumTrees()))
print("Profundidad: "+ str(rfmodel.stages[-1]._java_obj.parent().getMaxDepth()))  # summary only

print(rfmodel.stages[-1]._java_obj.toDebugString())  # summary only

Ahora estudiaremos cómo se comportan nuestras predicciones en este modelo. Aplicamos nuestro modelo de regresión lineal para el 20% de los datos que hemos separado del conjunto de datos de entrada. La salida del modelo será una columna de producción de electricidad teórica llamada "Predicted_PE".

- Ejecuta la siguiente celda
- Desplázate por la tabla de resultados y observa como los valores de la columna de salida de corriente (PE) se comparan con los valores correspondientes en la salida de potencia predicha  (Predicted_PE)

In [0]:
# Apply our RF model to the test data and predict power output
# Make predictions.
predictions = rfmodel.transform(testSetDF)
display(predictions)

AT,V,AP,RH,PE,features,Predicted_PE
1.81,39.42,1026.92,76.97,490.55,"List(1, 4, List(), List(1.81, 39.42, 1026.92, 76.97))",482.8756341829071
3.2,41.31,997.67,98.84,489.86,"List(1, 4, List(), List(3.2, 41.31, 997.67, 98.84))",481.7557416661863
3.38,41.31,998.79,97.76,489.11,"List(1, 4, List(), List(3.38, 41.31, 998.79, 97.76))",481.7557416661863
3.4,39.64,1011.1,83.43,459.86,"List(1, 4, List(), List(3.4, 39.64, 1011.1, 83.43))",483.4819899730006
3.51,35.47,1017.53,86.56,489.07,"List(1, 4, List(), List(3.51, 35.47, 1017.53, 86.56))",483.6194573071565
3.63,38.44,1016.16,87.38,487.87,"List(1, 4, List(), List(3.63, 38.44, 1016.16, 87.38))",483.6194573071565
3.91,35.47,1016.92,86.03,488.67,"List(1, 4, List(), List(3.91, 35.47, 1016.92, 86.03))",483.6194573071565
3.94,39.9,1008.06,97.49,488.81,"List(1, 4, List(), List(3.94, 39.9, 1008.06, 97.49))",482.5762174833853
4.0,39.9,1009.64,97.16,490.79,"List(1, 4, List(), List(4.0, 39.9, 1009.64, 97.16))",482.2814433879184
4.15,39.9,1007.62,95.69,489.8,"List(1, 4, List(), List(4.15, 39.9, 1007.62, 95.69))",483.2061550864569


A partir de una inspección visual de las predicciones, podemos ver que están cerca de los valores reales.

Sin embargo, nos gustaría disponer de una medida científica exacta de la bondad del modelo. Para realizar esta medición, podemos utilizar una métrica de evaluación como la [Error cuadrático medio](https://en.wikipedia.org/wiki/Root-mean-square_deviation) (RMSE) para validar nuestro modelo.

RSME se define como: \\( RMSE = \sqrt{\frac{\sum_{i = 1}^{n} (x_i - y_i)^2}{n}}\\) donde \\(y_i\\) es el valor observado \\(x_i\\) es el valor predicho

RMSE es una medida muy habitual para calcular las diferencias entre los valores predichos por un modelo o un estimador y los valores realmente observados. Cuanto menor sea el RMSE, mejor será nuestro modelo.

Spark ML Pipeline proporciona diferentes métricas para evaluar modelos de regresión, incluyendo [RegressionEvaluator()](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator).

Después de crear una instancia de [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator), fijaremos el nombre de la columna objetivo "PE" y  el nombre de la columna de predicción a "Predicted_PE". A continuación, invocaremos el evaluador en las predicciones.

In [0]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictions)

print("Root Mean Squared Error: %.2f" % rmse)

Otra medida de evaluación estadística muy útil es el coeficiente de determinación, que se denota \\(R ^ 2 \\) o \\(r ^ 2\\) y pronunciado "R cuadrado". Es un número que indica la proporción de la variación en la variable dependiente que es predecible a partir de las variables independientes y proporciona una medida de lo bien que los resultados observados son replicados por el modelo, basado en la proporción de la variación total de los resultados explicada por el modelo. El coeficiente de determinación va de 0 a 1 (más cerca de 1), y cuanto mayor sea el valor, mejor es nuestro modelo.


Para calcular \\(r^2\\), hemos de ejecutar el evaluador `regEval.metricName: "r2"`

Vamos a calcularlo ejecutando la celda siguiente.

In [0]:
# Now let's compute another evaluation metric for our test dataset
r2 = regEval.evaluate(predictions, {regEval.metricName: "r2"})

print("r2: {0:.2f}".format(r2))

##Parte 7: Ajustar y evaluar

Ahora que tenemos un primer modelo bastante bueno vamos a tratar de hacer uno aun mejor ajustando sus parametros. El proceso de ajustar un modelo se conoce como [Model Selection](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning) o [Hyperparameter Tuning](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.tuning). Spark ML Pipeline hace que el proceso de ajuste sea sencillo.

Spark ML Pipeline soporta la seleccion de modelos usando herramientas herramientas como el [CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator), que requiere los siguientes elementos:
- [Estimator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Estimator): un algoritmo o un pipeline a ajustar
- [Conjunto de ParamMaps](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder): parametros para elegir, tambien conocido como _parameter grid_
- [Evaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator): metrica para medir que tan bien lo hace un modelo sobre los datos de entrenamiento

A un alto nivel, las herramientas de seleccion de modelos, tales como [CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) trabajan de la siguiente manera:

- Se separaran los datos de entrada en dos conjuntos entrenamiento y test.
- Para cada uno de estos pares (entrenamiento, test), hay iterar a traves del conjunto de ParamMaps:
    - Para cada [ParamMap](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder), se ajusta el [Estimador](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.Estimator) usando dichos parametros, se obtiene el modelo ajustado, y se evaluar su rendimiento usando el [Evaluator] (https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator).
    - Seleccionan el mejor modelo producido por el conjunto de parametros.

El [Evaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.Evaluator) puede ser por ejemplo un [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator) para problemas de regresion. Como ayuda a construir el conjunto de parametros, los usuarios pueden utilizar la utilidad [ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder).

Ten en cuenta que la validacion cruzada sobre una conjunto grande de parametros es costosa.

En el siguiente apartado llevaremos a cabo los siguientes pasos:
- Crear un [CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) utilizando un pipeline y un [RegressionEvaluator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.RegressionEvaluator) que hemos creado anteriormente, y establecer el numero de pliegues (folds) a 5
- Crear una lista de 10 valores distintos para la profundidad de nuestro 'tree'.
  - Crear una lista de 10 parametros de numero de trees.
- Usar [ParamGridBuilder](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.ParamGridBuilder) para construir un conjunto de parametros con los parametros de profundidad y numero de árboles y anadir dicho conjunto al [CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator)
- Ejecutar el [CrossValidator](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.tuning.CrossValidator) para encontrar los parametros que producen el mejor modelo (es decir, mas bajo RMSE) y devolver el mejor modelo.

In [0]:
from pyspark.ml.tuning import *
rf2 = RandomForestRegressor()
# Now we set the parameters for the method
rf2.setPredictionCol("Predicted_PE")\
   .setLabelCol("PE")
#  .setMaxDepth(5)\
#  .setNumTrees(20).baseOn([setPredictionCol,"Predicted_PE"], [setLabelCol,"PE"])
# Chain indexer and forest in a Pipeline
pipeline = Pipeline(stages=[vectorizer, rf2])

grid = ParamGridBuilder().addGrid(rf2.maxDepth,[3,5,7,10,12]).addGrid(rf2.numTrees,[15,20,22,30,32]).build()

evaluator = RegressionEvaluator(predictionCol="Predicted_PE", labelCol="PE", metricName="rmse")

cv = CrossValidator(estimator = pipeline, estimatorParamMaps = grid ,evaluator = evaluator, numFolds = 5)

cvModel = cv.fit(trainingSetDF)


In [0]:
print("Nodos: " + str(cvModel.bestModel.stages[-1]._java_obj.parent().getNumTrees()))
print("Profundidad: "+ str(cvModel.bestModel.stages[-1]._java_obj.parent().getMaxDepth()))  # summary only


In [0]:
# averiguamos cuales son los parámetros para determinar el mejor modelo.
print(cvModel.bestModel.stages[1].explainParam('maxDepth'))
print(cvModel.bestModel.stages[1].explainParam('numTrees'))


In [0]:
predictions2 = cvModel.transform(testSetDF)
display(predictions2)


In [0]:
rmse2 = evaluator.evaluate(predictions2)
print("Root Mean Squared Error: %.2f" % rmse2)

##Parte 8: Propuesta de modelo lineal

En base a lo desarrollado en apartados anteriores, tomad una técnica de [regresión de las que Spark](https://spark.apache.org/docs/latest/ml-classification-regression.html) proporciona y aplicadla al problema (Regresión Lineal, Decision Tree, etc.)
Podéis re-utilizar gran parte del código que ya habéis escrito para realizar el modelado del ejercicio 6 en adelante. Para dicho modelo, calcula RSME y el porcentaje de valores que caen en 1 RMSE y 2 RMSE.

In [0]:
from pyspark.ml.regression import *
from pyspark.ml import Pipeline

# probamos la Regresión lineal
lr = LinearRegression(featuresCol="features", labelCol="PE", predictionCol="Predicted_PE", maxIter=500, regParam=0.3, elasticNetParam=0.8)

#lr.setParams(featuresCol="features", labelCol="PE", predictionCol="Predicted_PE", maxIter=10, regParam=0.3, elasticNetParam=0.8, tol=1e-6, fitIntercept=True, #standardization=True, solver="auto", weightCol=None, aggregationDepth=2, loss="squaredError", epsilon=1.35)

pipeline = Pipeline(stages=[vectorizer, lr])                      
# Entrenamos el modelo.  
lrmodel  = pipeline.fit(trainingSetDF)


In [0]:
# Calculamos las predicciones
lrprediction = lrmodel.transform(testSetDF)

display(lrprediction)

AT,V,AP,RH,PE,features,Predicted_PE
2.34,39.42,1028.47,69.68,490.34,"Map(vectorType -> dense, length -> 4, values -> List(2.34, 39.42, 1028.47, 69.68))",491.7458929341162
2.8,39.64,1011.01,82.96,482.66,"Map(vectorType -> dense, length -> 4, values -> List(2.8, 39.64, 1011.01, 82.96))",488.1455452518717
3.82,35.47,1016.62,84.34,489.04,"Map(vectorType -> dense, length -> 4, values -> List(3.82, 35.47, 1016.62, 84.34))",487.5670288921728
3.98,35.47,1017.22,86.53,489.64,"Map(vectorType -> dense, length -> 4, values -> List(3.98, 35.47, 1017.22, 86.53))",487.0665024880505
4.23,38.44,1016.46,76.64,489.0,"Map(vectorType -> dense, length -> 4, values -> List(4.23, 38.44, 1016.46, 76.64))",486.8894458475153
4.32,35.47,1017.8,88.51,488.03,"Map(vectorType -> dense, length -> 4, values -> List(4.32, 35.47, 1017.8, 88.51))",486.2545038115147
4.43,38.91,1019.04,88.17,491.9,"Map(vectorType -> dense, length -> 4, values -> List(4.43, 38.91, 1019.04, 88.17))",485.2832355553721
4.65,35.19,1018.23,94.78,489.36,"Map(vectorType -> dense, length -> 4, values -> List(4.65, 35.19, 1018.23, 94.78))",485.042656055323
4.78,42.85,1013.39,93.36,481.47,"Map(vectorType -> dense, length -> 4, values -> List(4.78, 42.85, 1013.39, 93.36))",482.643389290282
4.87,42.85,1012.69,94.72,482.05,"Map(vectorType -> dense, length -> 4, values -> List(4.87, 42.85, 1012.69, 94.72))",482.2759964866141


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
eval_lr = RegressionEvaluator(labelCol="PE", predictionCol="Predicted_PE", metricName="rmse")

# Root Mean Square Error
rmse_lr = eval_lr.evaluate(lrprediction)
print("RMSE: %.3f" % rmse_lr)

r2_lr = eval_lr.evaluate(lrprediction, {eval_lr.metricName: "r2"})
print("r2: %.3f" %r2_lr)

In [0]:
print("Model lineal: PE es funció de AT, V, AP, RH : Coeficients" + str(lrmodel.stages[1].coefficients) + " terme independent " + str(lrmodel.stages[1].intercept ))