# ML Students PySpark

___
## Introducción

Para la realización de esta tarea se utilizará el [dataset](https://archive.ics.uci.edu/ml/machine-learning-databases/00320/student.zip) que contiene las notas y otras características de alumnos en las asignaturas de matemáticas. [Aquí](https://archive.ics.uci.edu/ml/datasets/Student+Performance) se puede obtener una descripción detallada de todos los atributos. Como alternativa, a continuación se puede consultar estos atributos:

1. school - student's school (binary: "GP" - Gabriel Pereira or "MS" - Mousinho da Silveira)
2. sex - student's sex (binary: "F" - female or "M" - male)
3. age - student's age (numeric: from 15 to 22)
4. address - student's home address type (binary: "U" - urban or "R" - rural)
5. famsize - family size (binary: "LE3" - less or equal to 3 or "GT3" - greater than 3)
6. Pstatus - parent's cohabitation status (binary: "T" - living together or "A" - apart)
7. Medu - mother's education (numeric: 0 - none,  1 - primary education (4th grade), 2 - 5th to 9th grade, 3 - secondary education or 4 - higher education)
8. Fedu - father's education (numeric: 0 - none,  1 - primary education (4th grade), 2 - 5th to 9th grade, 3 - secondary education or 4 - higher education)
9. Mjob - mother's job (nominal: "teacher", "health" care related, civil "services" (e.g. administrative or police), "at_home" or "other")
10. Fjob - father's job (nominal: "teacher", "health" care related, civil "services" (e.g. administrative or police), "at_home" or "other")
11. reason - reason to choose this school (nominal: close to "home", school "reputation", "course" preference or "other")
12. guardian - student's guardian (nominal: "mother", "father" or "other")
13. traveltime - home to school travel time (numeric: 1 - <15 min., 2 - 15 to 30 min., 3 - 30 min. to 1 hour, or 4 - >1 hour)
14. studytime - weekly study time (numeric: 1 - <2 hours, 2 - 2 to 5 hours, 3 - 5 to 10 hours, or 4 - >10 hours)
15. failures - number of past class failures (numeric: n if 1<=n<3, else 4)
16. schoolsup - extra educational support (binary: yes or no)
17. famsup - family educational support (binary: yes or no)
18. paid - extra paid classes within the course subject (Math or Portuguése) (binary: yes or no)
19. activities - extra-curricular activities (binary: yes or no)
20. nursery - attended nursery school (binary: yes or no)
21. higher - wants to take higher education (binary: yes or no)
22. internet - Internet access at home (binary: yes or no)
23. romantic - with a romantic relationship (binary: yes or no)
24. famrel - quality of family relationships (numeric: from 1 - very bad to 5 - excellent)
25. freetime - free time after school (numeric: from 1 - very low to 5 - very high)
26. goout - going out with friends (numeric: from 1 - very low to 5 - very high)
27. Dalc - workday alcohol consumption (numeric: from 1 - very low to 5 - very high)
28. Walc - weekend alcohol consumption (numeric: from 1 - very low to 5 - very high)
29. health - current health status (numeric: from 1 - very bad to 5 - very good)
30. absences - number of school absences (numeric: from 0 to 93)
31. G1 - first period grade (numeric: from 0 to 20)
32. G2 - second period grade (numeric: from 0 to 20)
33. G3 - final grade (numeric: from 0 to 20, output target)

Los pasos que se seguirán tienen que ser:
1. Cargar los datos en un RDD
2. Seleccionar sólo las columnas numéricas y añadir una nueva columna con un valor 0 si el alumno ha suspendido (G3 < 10) o 1 si ha aprobado (G3 >= 10)
3. Transformar el RDD en dataframe
4. Transformar el dataframe para dejarlo listo para hacer la predicción de la nota G3 y clasificación de si aprueba o no
5. Crear un modelo para predecir la nota G3 y medir la calidad con el conjunto de entrenamiento y el de test
6. Crear un modelo para clasificar al alumno en aprobado o suspenso y medir la calidad con el conjunto de entrenamiento y el de test

Para la ejecución de la tarea se ha utilizado el **entorno de Databricks con Spark 2.2.1**

___
## Preparación del entorno: paquetes y variables

Lo primero que vamos hacer es **importar todos los paquetes** necesarios y crear el contexto para los data frames (SQLContext)

In [4]:
# -*- coding: utf-8 -*-
%matplotlib inline

import re
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import corr
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

sqlc = SQLContext(sc)

A continuación se crean las **variables** para:
* La ruta del fichero y el nombre del fichero con las notas de los alumnos de matemáticas
* El separador de los campos en el fichero
* El umbral por defecto que se utilizará para saber si dos variables están correladas
* El nombre y posición de las columnas tipo numérico en el fichero; además del tipo de dato que será en el dataframe
* Un array con todos elementos del punto anterior para ser iterable. Además se añade la columna objetivo que valdrá 0 si el alumno ha suspendido (G3 < 10) o 1 en caso contrario (G3 >= 10). Las columnas objetivo (G3 y pass) siempre estarán al final de esta lista. De esta forma, se puede saber que columnas se pueden eliminar si están correladas de forma programática.

También se crea una función en la que se devuelve una lista con el nombre de las columnas numéricas.

In [6]:
# Ruta y nombre del fichero student y separador por defecto que utiliza. Ademas del umbral de correlacion por defecto
RUTA_FICHERO = '/FileStore/tables/student_mat-be366.csv'
SEPARADOR = ';'
UMBRAL_CORRELACION = 0.80

# Variables numericas del fichero, la que se crea para saber si un alumno aprueba o no y columna feature: 
#    - posicion 0: nombre de la columna en el fichero
#    - posicion 1: posicion de la columna en el fichero
#    - posicion 2: tipo para convertir de RDD a dataframe
#    - posicion 3: True si la columna puede ser nullable. Se utilizara al convertir de RDD a dataframe.
COL_AGE = ('age', 2, DoubleType(), True)
COL_MEDU = ('Medu', 6, DoubleType(), True)
COL_FEDU = ('Fedu', 7, DoubleType(), True)
COL_TRAVELTIME = ('traveltime', 12, DoubleType(), True)
COL_STUDYTIME = ('studytime', 13, DoubleType(), True)
COL_FAILURES = ('failures', 14, DoubleType(), True)
COL_FAMREL = ('famrel', 23, DoubleType(), True)
COL_FREETIME = ('freetime', 24, DoubleType(), True)
COL_GOOUT = ('goout', 25, DoubleType(), True)
COL_DALC = ('Dalc', 26, DoubleType(), True)
COL_WALC = ('Walc', 27, DoubleType(), True)
COL_HEALTH = ('health', 28, DoubleType(), True)
COL_ABSENCES = ('absences', 29, DoubleType(), True)
COL_G1 = ('G1', 30, DoubleType(), True)
COL_G2 = ('G2', 31, DoubleType(), True)
COL_G3 = ('G3', 32, DoubleType(), True)
COL_PASS = ('pass', None, DoubleType(), True)
COL_FEATURES = ('features', None, None, None)
COL_PREDICTION = ('prediction', None, None, None)

# Lista que almacena todas las variables numericas listadas arriba (excepto features) para una mejor iterazion.
# Las ultimas son las variables target
COLS_NUMERICAS = (
    COL_AGE, 
    COL_MEDU, 
    COL_FEDU, 
    COL_TRAVELTIME, 
    COL_STUDYTIME, 
    COL_FAILURES,
    COL_FAMREL, 
    COL_FREETIME, 
    COL_GOOUT, 
    COL_DALC, 
    COL_WALC, 
    COL_HEALTH, 
    COL_ABSENCES, 
    COL_G1, 
    COL_G2,
    COL_G3, 
    COL_PASS)
# De todos las columnas en la lista anterior, esta variable dice cuantas son target empezando desde atras
NUMERO_COLUMNAS_TARGET_EN_COLS_NUMERICAS = 2

# Obtiene el nombre de las cabeceras y las devuelve como una lista.
# parametro cols: lista con elementos que contienen el nombre de la columna. Cada elemento tiene que ser una 
# lista donde la posicion 0 es el nombre de la columna.
def obtenerNombreCabeceras(cols=COLS_NUMERICAS):
    """
    Obtiene el nombre de las cabeceras y las devuelve como una lista.

    :param cols: lista con elementos que contienen el nombre de la columna. Cada elemento tiene que ser una 
    lista donde la posicion 0 es el nombre de la columna. Por defecto es la variable COLS_NUMERICAS
    :return: lista con el nombre de las columnas pasadas por parametro.
    """
    
    nombres = list()
    for col in cols:
        nombres.append(col[0])
    return nombres

___
## Creación del RDD y filtrado columnas numéricas

Una vez que se han definido las variables, ya se puede **crear el RDD**. Para ello:
* Se carga el fichero
* Se elimina la cabecera del fichero utilizando filter
* Utilizando una función map se obtienen las columnas numéricas:
    * Se divide la línea por ;
    * Se seleccionan las columnas numéricas
    * Se eliminan " ya que algunos campos numéricos empiezan y/o terminan por esta símbolo
    * Se trasforma en float
    * Se añade una columna al final que vale 0 si G3 < 10 (suspenso/a) y 1 si G3 >= 10 (aprobado/a)

In [8]:
def convertirANumeroFiltrarYAnyadirPass(linea, separador=SEPARADOR, cols=COLS_NUMERICAS, colTarget=COL_G3):
    """
    Esta funcion:
        1. Divide la linea en elementos separados por ;
        2. Filtra por las columnas que se ha pasado por parametro cols
        3. Elimina " para no tener problemas al hacer el cast a float
        4. Convierte a numero los campos que son numericos
        5. Se crea un nuevo campo al final con el valor float 0 si G3 es menor que 10 y 1 si es mayor o igual que 10

    :param linea: linea del fichero separada por el separador
    :param separador: separador de los campos que hay en la linea. Por defecto es SEPARADOR
    :param cols: lista con elementos que contienen el nombre de la columna. Cada elemento tiene que ser una 
    lista donde la posicion 1 es la posicion en la linea. Por defecto es la variable COLS_NUMERICAS
    :param colTarget: lista donde la posicion 0 es el nombre de la columna target para crear la variable de si ha aprobado
    o no. Por defecto es la variable COL_G3
    :return: lista con los valores float de los campos que se han especificado para filtrar (en el orden pasado en cols) mas 
    un elemento al final cuyo valor es 0 si colTarget es menor que 10 y 1 si es mayor o igual
    """
    
    lineaSeparada = linea.split(separador)
    
    listaFloat = list()
    for col in cols:
        if col[1]:
            listaFloat.append(float(eliminarDobleComas(lineaSeparada[col[1]])))
    
    notaFinal = float(eliminarDobleComas(lineaSeparada[colTarget[1]]))
    if notaFinal < 10:
        listaFloat.append(float(0))
    else:
        listaFloat.append(float(1))
        
    return listaFloat

def eliminarDobleComas(elemento):
    """
    Elimina cualquier " que haya en el parametro.
    
    :param elemento: string del que se quiere eliminar "
    :return: elemento sin ningun "
    """
        
    return re.sub('"+', '', elemento)

In [9]:
# Se lee el csv y se elimina la cabecera
fichero = sc.textFile(RUTA_FICHERO)
cabeceraFichero = fichero.first()
datos = fichero.filter(lambda x: x!=cabeceraFichero)

# Se hace el map para dividir las filas, quedarse con los campos numericos y convertirlos a float, y anyadir si ha pasado o no
datosSoloNumerosYPass = datos.map(convertirANumeroFiltrarYAnyadirPass)
datosSoloNumerosYPass.take(3)

___
## Trasformación RDD a dataframe y estudio de las variables

Se **trasforma el RDD en un dataframe** creando un esquema para tal fin utilizando la variable COLS_NUMERICAS para hacerlo de forma programática:

In [11]:
# Esquema que se utiliza para convertir de RDD a Dataframe. Para ello utiliza la variable del principio COLS_NUMERICAS:
# nombre de la columna, tipo en Spark, si es nullable
esquema = StructType()
for col in COLS_NUMERICAS:
    esquema.add(StructField(col[0], col[2], col[3]))
    
# Se convierte el RDD a dataframe
dataframe = sqlc.createDataFrame(datosSoloNumerosYPass, esquema)
dataframe.show(3)

Se muestran las características de cada columna ya que si se hace diretamente sobre todo el dataframe la salida no es muy legible:

In [13]:
# Para que se vea claro en pantalla se imprime columna por columna el describe
for nombrecolumna in obtenerNombreCabeceras():
    print dataframe.select(nombrecolumna).describe().show()

El siguiente paso es ver la **correlación** entre las columnas y luego eliminar las independientes que estén bastante correladas con otras, esto es, superior al 0.80

Para ello se crean dos funciones:
* Una que recorra el dataframe y obtenga la correlación columna por columna
* Otra que obtenga el nombre de las columnas independientes que no estén correladas
Después, utilizando select se seleccionan las columnas no correladas además de las columnas objetivo (que corresponden a las dos últimas del dataframe):

In [15]:
def obtenerCorrelacionColumnasDataframe(dataframe, cols=COLS_NUMERICAS):
    """
    Obtiene la correlación entre todas columnas del dataframe.

    :param dataframe: dataframe del que se quiere obtener la correlación de las columnas. 
    :param cols: lista con las columnas que se quiere obtener la correlación. Cada elemento tiene que ser una 
    lista donde la posicion 0 es el nombre de la columna. Por defecto es la variable COLS_NUMERICAS
    :return: lista donde cada element es una list con el nombre de la columna 1, nombre columna 2 y correlación 
    """
    
    longitudCols = len(cols)
    # lista con las listas nombre columna 1, nombre columna 2, correlación
    todasNombresCorrelacion = list()
    numeroactualCols1 = 0
    
    # Recorre todas las columnas en cols. Esta sera columna 1
    for col1 in cols:
        
        # Las siguientes columnas seran columna 2 de tal forma que se pueda crear (columna 1, columna 2, correlación)
        numeroactualCols2 = numeroactualCols1 + 1
        while numeroactualCols2 < longitudCols:
            col2 = cols[numeroactualCols2]
            # Se calcula la correlación entre columna 1 y columna 2
            correlacion = dataframe.corr(col1[0], col2[0])
            # Se crea la lista nombre columna 1 - nombre columna 2 - correlación
            nombresCorrelacion = [col1[0], col2[0], correlacion]
            todasNombresCorrelacion.append(nombresCorrelacion)
            numeroactualCols2 += 1
            
        numeroactualCols1 += 1
    return todasNombresCorrelacion

def obtenerCabecerasNoCorrelacionadas(listaCol1Col2Correlacion, cols=COLS_NUMERICAS, 
                                      colsTarget=COLS_NUMERICAS[-NUMERO_COLUMNAS_TARGET_EN_COLS_NUMERICAS:], 
                                      umbral=UMBRAL_CORRELACION):
    """
    Obtiene todas las cabeceras cuyas correlación es inferior al umbral.

    :param listaCol1Col2Correlacion: lista cuyos elementos son listas con el formato 
    nombre columna 1, nombre columna 2, correlacion
    :param cols: lista con las columnas de las correlaciones. Cada elemento tiene que ser una lista donde 
    la posicion 0 es el nombre de la columna. Por defecto es la variable COLS_NUMERICAS.
    :param colsTarget: lista con las columnas de las correlaciones que son target y no se eliminaran.
    Cada elemento tiene que ser una lista donde la posicion 0 es el nombre de la columna. 
    Por defecto es la variable COLS_NUMERICAS[-NUMERO_COLUMNAS_TARGET_EN_COLS_NUMERICAS:].
    :param umbral: umbral que dice si dos columnas estan correlaciondas y por tanto 1 de ellas sera eliminada. Por 
    defecto es UMBRAL_CORRELACION, si es superior a 1 o menor que 0 se pone a valor UMBRAL_CORRELACION
    :return: lista con el nombre de las columnas que no estan correlacionadas con otras, ademas de las columnas target
    """
        
    # Se comprueba que el umbral sea mayor que 0 y menor que 1. Si no se pone el de por defecto
    if umbral < 0 or umbral > 1:
        umbral = UMBRAL_CORRELACION
    
    # Se obtienen todos los nombres de las columnas. A partir de aqui se iran eliminando las que estan correlacionadas
    cabeceras = obtenerNombreCabeceras(cols)
    nombreColsTarget = obtenerNombreCabeceras(colsTarget)
    
    # Se recorre la lista con las correlaciones. Si es mayor que el umbral y ninguna de las columnas son 
    # target, se elimina una de ellas
    for col1Col2Correlacion in listaCol1Col2Correlacion:
        if col1Col2Correlacion[0] not in nombreColsTarget and col1Col2Correlacion[1] not in nombreColsTarget:
            if (-umbral >= col1Col2Correlacion[2] or umbral <= col1Col2Correlacion[2]) and col1Col2Correlacion[0] in cabeceras:
                cabeceras.remove(col1Col2Correlacion[0])
    return cabeceras

In [16]:
col1Col2Correlaciones = obtenerCorrelacionColumnasDataframe(dataframe)
print 'Correlación entre columnas columna 1 - columna 2 - correlación: {0}'.format(col1Col2Correlaciones)

In [17]:
cabecerasParaSelect = obtenerCabecerasNoCorrelacionadas(col1Col2Correlaciones)
print 'Total cabeceras cuya correlación no es superior al umbral (mas las columnas target): {0}. Estas cabeceras son: {1}'.format(len(cabecerasParaSelect), cabecerasParaSelect)

cabecerasEliminadas = list()
for col in obtenerNombreCabeceras():
    if col not in cabecerasParaSelect:
        cabecerasEliminadas.append(col)
print 'Total cabeceras eliminadas: {0}. Cabeceras eliminadas: {1}'.format(len(cabecerasEliminadas), cabecerasEliminadas)

In [18]:
dataframeReducido = dataframe.select(cabecerasParaSelect)
dataframeReducido.show(5)

___
## Creación dataframes listos para los modelos

Los dos último pasos antes de crear los modelos son:
* Añadir las variables independientes en un vector. Después se guardarán en cache los dos conjuntos para que el rendimiento sea mejor.
* Separar el conjunto entre conjunto de entrenamiento y el de test. La relación será 70/30

In [20]:
# Antes de crear el VectorAssembler se obtienen el nombre de las columnas de las variables independientes
# Para ello se coge la lista con las columnas no correladas y se eliminan las target (que seran las ultimas en COLS_NUMERICAS)
cabecerasVectorAssembler = cabecerasParaSelect[:]
for cabeceraTarget in obtenerNombreCabeceras()[-NUMERO_COLUMNAS_TARGET_EN_COLS_NUMERICAS:]:
    cabecerasVectorAssembler.remove(cabeceraTarget)

# Se crea el VectorAssembler poniendo el resultado en COL_FEATURES[0] columna
vectorAssembler = VectorAssembler(
    inputCols=cabecerasVectorAssembler, 
    outputCol=COL_FEATURES[0])

dataframeEnsamblado = vectorAssembler.transform(dataframeReducido)
dataframeEnsamblado.show(3)

dataframeEnsamblado.cache()

In [21]:
# Se divide el dataframe en entrenamiento y test
dataframeEnsambladoDividido = dataframeEnsamblado.randomSplit([0.7, 0.3], 1234)
dataframeEnsambladoEntrenamiento = dataframeEnsambladoDividido[0]
dataframeEnsambladoTest = dataframeEnsambladoDividido[1]

___
## Predicción: modelo de regresión 

Para la contrucción del modelo de predicción se utiliza una regresión lineal con **cross validation**. Los valores que se le pasará son:
* regParam: 0.0, 0.01, 0.05, 0.5
* maxIter: 5, 10
* numFolds: 5

Una vez conseguido el modelo, se muestra el intercept y los coeficientes del mejor de ellos. Se puede ver el gran peso que tiene G2 en la predicción de G3:

In [23]:
# Se crea el evaluador para este modelo siendo la columna objetivo COL_G3[0]
evaluatorRegression = RegressionEvaluator(labelCol=COL_G3[0])

# Se crea la regresión lineal con solve=normal 
linearRegression = LinearRegression(solver='normal', labelCol=COL_G3[0], featuresCol=COL_FEATURES[0])
# Crossvalidation: los parámetros escogidos son:
#     * regParam: 0.0, 0.01, 0.05, 0.5
#     * maxIter: 5, 10
#     * numFolds: 5
gridLinearRegression = ParamGridBuilder(). \
                        addGrid(linearRegression.regParam, [0.0, 0.01, 0.05, 0.5]). \
                        addGrid(linearRegression.maxIter, [5, 10]).build()
crossValidatorLinearRegression = CrossValidator(estimator=linearRegression, \
                                                estimatorParamMaps=gridLinearRegression, \
                                                evaluator=evaluatorRegression, \
                                                numFolds=5)
# Se obtiene el modelo con los datos de entrada
crossValidatorLinearRegressionModel = crossValidatorLinearRegression.fit(dataframeEnsambladoEntrenamiento)

# Se muestra el intercept y los coeficientes del mejor modelo
print 'Intercept del mejor modelo: {0}'.format(crossValidatorLinearRegressionModel.bestModel.intercept)
print 'Coeficientes del mejor modelo: {0}'.format(crossValidatorLinearRegressionModel.bestModel.coefficients)

Se obtiene la predicción para el conjunto de entrenamiento y el de test con el mejor modelo para posteriormente mostrar el **RMSE**:

In [25]:
# Se obtiene la predicción sobre el conjunto de test y entrenamiento
dataframeEnsambladoEntrenamientoPrediccion = crossValidatorLinearRegressionModel.bestModel.transform(dataframeEnsambladoEntrenamiento)
dataframeEnsambladoTestPrediccion = crossValidatorLinearRegressionModel.bestModel.transform(dataframeEnsambladoTest)

# Se obtiene el RMSE sobre los dos conjuntos
rmseLinearRegressionEntrenamiento = evaluatorRegression.evaluate(dataframeEnsambladoEntrenamientoPrediccion, {evaluatorRegression.metricName: 'rmse'})
rmseLinearRegressionTest = evaluatorRegression.evaluate(dataframeEnsambladoTestPrediccion, {evaluatorRegression.metricName: 'rmse'})
# Se imprime por pantalla
print 'RMSE en training: {0}'.format(rmseLinearRegressionEntrenamiento)
print 'RMSE en test: {0}'.format(rmseLinearRegressionTest)

Como se puede observer el error en el conjunto de entrenamiento es ligeramente superior al de test. No obstante ninguno supera el valor 2.

A continuación se muestran en un gráfico la nota real frente a la predicha tanto para el conjunto de test como para el de entrenamiento. Como se podrá observar la gran mayoría se están al rededor de la líena azul que sería la perfección. En los que se puede ver una gran diferencia entre lo predicho y lo real es cuando esta última es 0:

In [27]:
# Se crea la lista con la nota real y la predicha
xEntrenamiento, yEntrenamiento = list(), list()
for entrenamientoPrediccion in dataframeEnsambladoEntrenamientoPrediccion.collect():
    xEntrenamiento.append(entrenamientoPrediccion[COL_G3[0]])
    yEntrenamiento.append(entrenamientoPrediccion[COL_PREDICTION[0]])
xTest, yTest = list(), list()
for testPrediccion in dataframeEnsambladoTestPrediccion.collect():
    xTest.append(testPrediccion[COL_G3[0]])
    yTest.append(testPrediccion[COL_PREDICTION[0]])

# Se crea el gráfico y se muestra
plt.clf()
plt.xlim(0, 20)
plt.ylim(0, 20)
plt.xlabel('Nota real')
plt.ylabel('Nota segun el modelo')
plt.title('Nota real vs prediccion Regresion lineal conjunto test')

plt.plot([0, 20], [0, 20], 'b')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(xEntrenamiento, yEntrenamiento, 'go', label='Entrenamiento')
plt.plot(xTest, yTest, 'ro', label='Test')
plt.legend(loc='lower right')

plt.show()
display()


___
## Clasificación: árboles de decisión 

Para la contrucción del modelo de clasificación se ha obtado por un árbol de decisión con **cross validation**. Los valores que se le pasará son:
* maxDepth: 3, 6, 10
* maxBins: 20, 40, 80
* numFolds: 5

Una vez conseguido el modelo, se muestra el número de nodos, la profundidad y el árbol en sí del mejor de ellos:

In [29]:
# Se crea el evaluador para este modelo siendo la columna objetivo COL_PASS[0]
evaluatorBinaryClassification = BinaryClassificationEvaluator(labelCol=COL_PASS[0])

# Se crea el árbol de decisión
decisionTreeClassifier = DecisionTreeClassifier(labelCol=COL_PASS[0], featuresCol=COL_FEATURES[0])
# Crossvalidation: los parámetros escogidos son:
#     * maxDepth: 3, 6, 10
#     * maxBins: 20, 40, 80
#     * numFolds: 5
gridTreeClassifier = ParamGridBuilder(). \
                        addGrid(decisionTreeClassifier.maxDepth, [3, 6, 10]). \
                        addGrid(decisionTreeClassifier.maxBins, [20, 40, 80]).build()
crossValidatorDecisionTree = CrossValidator(estimator=decisionTreeClassifier, \
                                                estimatorParamMaps=gridTreeClassifier, \
                                                evaluator=evaluatorBinaryClassification, \
                                                numFolds=5)
# Se obtiene el modelo con los datos de entrada
crossValidatorDecisionTreeModel = crossValidatorDecisionTree.fit(dataframeEnsambladoEntrenamiento)

# Se muestra el número de nodos, profundida y el árbol mejor modelo
print 'Número del nodos del mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.numNodes)
print 'Profundidad del mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.depth)
print 'Árbol mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.toDebugString)

Se puede ver que el campo más significativo en este árbol es feature 13 que corresponde a G2. Era de esperar que tuviese mucha importancia porque está muy correlado con la variable dependiente.

A continuación se obtiene la predicción para el conjunto de entrenamiento y el de test con el mejor modelo para posteriormente mostrar el **área bajo la curva ROC**:

In [31]:
# Se obtiene la predicción sobre el conjunto de test y entrenamiento
dataframeEnsambladoEntrenamientoClasificacionPrediccion = crossValidatorDecisionTreeModel.bestModel.transform(dataframeEnsambladoEntrenamiento)
dataframeEnsambladoTestClasificacionPrediccion = crossValidatorDecisionTreeModel.bestModel.transform(dataframeEnsambladoTest)

# Se obtiene el RMSE sobre los dos conjuntos
arearocDecisionTreeEntrenamiento = evaluatorBinaryClassification.evaluate(\
                                                                          dataframeEnsambladoEntrenamientoClasificacionPrediccion, \
                                                                          {evaluatorBinaryClassification.metricName: 'areaUnderROC'})
arearocDecisionTreeTest = evaluatorBinaryClassification.evaluate(\
                                                                 dataframeEnsambladoTestClasificacionPrediccion, \
                                                                 {evaluatorBinaryClassification.metricName: 'areaUnderROC'})
# Se imprime por pantalla
print 'Área bajo la curva ROC en training: {0}'.format(arearocDecisionTreeEntrenamiento)
print 'Área bajo la curva ROC en test: {0}'.format(arearocDecisionTreeTest)

Como se puede observar está muy cerca del ideal. Y por último se dibuja la curva ROC. El área bajo la curva no es la misma que la obtenida a través del modelo. Esto puede deberse a que no se está teniendo en cuenta la probabilidad que estima el modelo:

In [33]:
# Se crea la lista con la nota real y la predicha
xEntrenamiento, yEntrenamiento = list(), list()
for entrenamientoPrediccion in dataframeEnsambladoEntrenamientoClasificacionPrediccion.collect():
    xEntrenamiento.append(entrenamientoPrediccion[COL_PASS[0]])
    yEntrenamiento.append(entrenamientoPrediccion[COL_PREDICTION[0]])
xTest, yTest = list(), list()
for testPrediccion in dataframeEnsambladoTestClasificacionPrediccion.collect():
    xTest.append(testPrediccion[COL_PASS[0]])
    yTest.append(testPrediccion[COL_PREDICTION[0]])

# Se obtienen los falsos positivos y verdaderos positivos y el valor bajo la curva para el test y el entrenamiento
falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento, thresholdsEntrenamiento = roc_curve(xEntrenamiento, yEntrenamiento)
rocAucEntrenamiento = auc(falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento)
falsePositiveRateTest, truePositiveRateTest, thresholdsTest = roc_curve(xTest, yTest)
rocAucTest = auc(falsePositiveRateTest, truePositiveRateTest)

# Se crea el gráfico y se muestra
plt.clf()
plt.title('Curva ROC con Arbol de decision')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento, 'g', label='AUC Entrenamiento = %0.2f'% rocAucEntrenamiento)
plt.plot(falsePositiveRateTest, truePositiveRateTest, 'r', label='AUC Test = %0.2f'% rocAucTest)
plt.legend(loc='lower right')
plt.plot([0,1], [0,1], 'r--')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.ylabel('True Positive Ratio')
plt.xlabel('False Positive Ratio')
plt.show()
display()

___
## Conclusión
Como se ha visto, es muy sencillo la carga de datos en RDD/dataframe, modificación de los distintos campos y preparación para los modelos.

A modo de ejemplo, aunque no se ha utilizado en la tarea, se podría escalar todas las columnas en el dataframe de la siguiente manera:
```python
scaler = StandardScaler(inputCol=COL_FEATURES[0], outputCol='scaled'+COL_FEATURES[0], withStd=True, withMean=True)
scalerModel = scaler.fit(dataframeEnsamblado)
scaledData = scalerModel.transform(dataframeEnsamblado)
scaledData.select(COL_FEATURES[0], 'scaled'+COL_FEATURES[0]).show(3)```

Después se pueden crear modelos de predicción y clasificación con distintos algoritmos. Si se quisiese utilizar otro algoritmo, bastaría con cambiar la clase ya que todas tienen la misma interfaz. De hecho, se podría utilizar un bucle con distintos algorimos y quedarnos con el mejor de ellos.