# M7 Tarea 2 - Machine Learning con Spark

Tatan Rufino

___

Los pasos que se seguirán tienen que ser:

1. Carga del fichero de datos en un RDD.
2. Mediante operaciones sobre el RDD quedarse únicamente con los campos numéricos y añadir un nuevo campo que a partir de la nota (G3) tome un valor de “aprobado/suspendido” (1/0) dependiendo de si la nota es mayor o menor que 10.
3. Creación de un DataFrame Spark a partir del RDD anterior.
4. Sobre ese DataFrame aplicar todos los pasos necesarios que se han visto en los ejemplos del manual para construir el modelo de regresión (predicción) comentado anteriormente y su posterior evaluación (medición de errores sobre test y training).
5. Idem para el modelo de clasificación
6. Se valorará como un extra la utilización de funciones de correlación para eliminación de predictores altamente correlados y la construcción del modelo utilizando técnicas de cross-validation
  - Este ejercicio se hará en paralelo a la construcción de los modelos (regresión y clasificación) en la medición de errores sobre test y training

___
## Librerías y preparación de variables

In [4]:

import re
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import *
from pyspark.sql.functions import corr
from pyspark.ml.feature import VectorAssembler, VectorIndexer, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression, LinearRegressionModel
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve, auc

sqlc = SQLContext(sc)

A continuación se crean las **variables** para:
* La ruta del fichero y el nombre del fichero con las notas de los alumnos de matemáticas
* El separador de los campos en el fichero
* El umbral por defecto que se utilizará para saber si dos variables están correladas
* El nombre y posición de los predictores tipo numérico en el fichero; además del tipo de dato que será en el dataframe
* Un array (lista) con todos elementos del punto anterior para ser iterable. Además se añade la columna objetivo que valdrá 0 si el alumno ha suspendido (G3 < 10) o 1 en caso contrario (G3 >= 10). Las columnas objetivo (G3 y pass) siempre estarán al final de esta lista. De esta forma, podremos adivinar que columnas se pueden eliminar si están correladas.

In [6]:
# Ruta y nombre del fichero student y separador por defecto que utiliza. Ademas del umbral de correlacion por defecto
PATH_FILE = '/FileStore/tables/student_mat-be366.csv'
UMBRAL_CORRELACION = 0.80 ### Este será el umbral escogido para eliminar los predictores altamente correlados

# Agrupación de predictores numéricos, variable PASS (basada en las CC del enunciado) en relación a si el alumno pasa o no, y las variables para los modelos 
#    - posicion 0: Nombre de la columna en el fichero
#    - posicion 1: Posicion de la columna en el fichero
#    - posicion 2: Tipo para convertir de RDD a dataframe
#    - posicion 3: True si la columna puede ser null. Se utilizara al convertir de RDD a dataframe.

PRED_AGE = ('age', 2, DoubleType(), True)
PRED_MEDU = ('Medu', 6, DoubleType(), True)
PRED_FEDU = ('Fedu', 7, DoubleType(), True)
PRED_TRAVELTIME = ('traveltime', 12, DoubleType(), True)
PRED_STUDYTIME = ('studytime', 13, DoubleType(), True)
PRED_FAILURES = ('failures', 14, DoubleType(), True)
PRED_FAMREL = ('famrel', 23, DoubleType(), True)
PRED_FREETIME = ('freetime', 24, DoubleType(), True)
PRED_GOOUT = ('goout', 25, DoubleType(), True)
PRED_DALC = ('Dalc', 26, DoubleType(), True)
PRED_WALC = ('Walc', 27, DoubleType(), True)
PRED_HEALTH = ('health', 28, DoubleType(), True)
PRED_ABSENCES = ('absences', 29, DoubleType(), True)
PRED_G1 = ('G1', 30, DoubleType(), True)
PRED_G2 = ('G2', 31, DoubleType(), True)
PRED_G3 = ('G3', 32, DoubleType(), True)
PRED_PASS = ('pass', None, DoubleType(), True)

# ML variables
PRED_FEATURES = ('features', None, None, None)
PRED_PREDICTION = ('prediction', None, None, None)

# Creamos una lista con todos los predictores para poder iterar en adelante para las sucesivas operaciones y hacerlo más eficiente

PRED_NUMERICOS = (
    PRED_AGE, 
    PRED_MEDU, 
    PRED_FEDU, 
    PRED_TRAVELTIME, 
    PRED_STUDYTIME, 
    PRED_FAILURES,
    PRED_FAMREL, 
    PRED_FREETIME, 
    PRED_GOOUT, 
    PRED_DALC, 
    PRED_WALC, 
    PRED_HEALTH, 
    PRED_ABSENCES, 
    PRED_G1, 
    PRED_G2,
    PRED_G3, 
    PRED_PASS)

# De todos las columnas en la lista anterior, esta variable dice cuantas son target empezando desde atras
PREDICTORES_OBJETIVOS = 2


___
## Creación del RDD y filtrado de predictores numéricos

Una vez que se han definido las variables, ya se puede **crear el RDD**. Para ello:
* Se cargan los datos
* Se elimina la cabecera del fichero utilizando filter
* Se crea una función en la que se devuelve una lista con el nombre de los predictores numéricos. (get_cabeceras)
* Utilizando una función map se obtienen los predictores numéricos: (soporte_mapper)
    * Se divide la línea por ;
    * Se seleccionan los predictores numéricos
    * Se eliminan " para evitar errores de sintaxis
    * Se transforma a variable tipo "float"
    * Se añade una columna PASS al final, basándonos en la exigencia del enunciado de que si 0 si G3 < 10 (suspenso/a) y 1 si G3 >= 10 (aprobado/a)
* Se crea una función que elimina las dobles comas (rm_commas)

In [8]:
# Función creada para iterar de forma eficiente las listas, obtener las cabeceras y guardarlas en otra lista lista

def get_cabeceras(preds=PRED_NUMERICOS): ##Le metemos como argumento por defecto la lista de predictores numéricos
    
    cabeceras = list()
    for pred in preds:
        cabeceras.append(pred[0])
    return cabeceras

# Función creada para generar el Rdd de datos basando en las condiciones de contorno establecidas
   
def soporte_mapper(linea, separador=';', cols=PRED_NUMERICOS, colTarget=PRED_G3): #De nuevo lleva de argumentos las variables deseadas
     
    lineaSeparada = linea.split(separador)
    
    listaFloat = list()
    for col in cols:
        if col[1]:
            listaFloat.append(float(eliminarDobleComas(lineaSeparada[col[1]])))
    
    notaFinal = float(eliminarDobleComas(lineaSeparada[colTarget[1]]))
    if notaFinal < 10:
        listaFloat.append(float(0))
    else:
        listaFloat.append(float(1))
        
    return listaFloat

# Función para eliminar dobles comas
  
def rm_commas(input):
  
      return re.sub('"+', '', input)

In [9]:
# Se carga el archivo
datos = sc.textFile(PATH_FILE)
cabeceraDatos = datos.first()
Rdd_tmp = datos.filter(lambda x: x!=cabeceraDatos)

# Se hace el map para dividir las filas, quedarse con los campos numericos y convertirlos a float, y anyadir si ha pasado o no
Rdd = Rdd_tmp.map(soporte_mapper)
Rdd.take(3)

___
## Trasformación RDD a dataframe y estudio de las variables

Se **trasforma el RDD en un dataframe** creando un esquema para tal fin utilizando la variable PRED_NUMERICOS:

In [11]:
# Creamos el esquema que nos sirve de soporte para la generación del Rdd, usando el array PRED_NUMERICOS

schema = StructType()
for pred in PRED_NUMERICOS:
    schema.add(StructField(pred[0], pred[2], pred[3]))
    
# Se convierte el RDD a dataframe
DF = sqlc.createDataFrame(Rdd, schema)
DF.show(3)

Hacemos un summary de los diferentes predictores y sus indicadores principales

In [13]:
for predictor in get_cabeceras():
    print(DF.select(predictor).describe().show())

Ahora estudiaremos la **correlación entre predictores** y para eliminar las que esten fuertemente correladas (cuya independencia sea superior al 80%)

Para ello se crean dos funciones:
* Una función que recorra el DF y obtenga la correlación de predictor a predictor (get_correlations)
* Una función que nombre de predictores independientes que no estén correlados. Finalmente nos quedamos con los predictores no correlados y los objetivos (get_no_correlated)

In [15]:
def get_correlations(DF, preds=PRED_NUMERICOS):
   
    longitudpreds = len(preds)
    # lista con las listas nombre pred 1, nombre pred 2, correlación
    todasNombresCorrelacion = list()
    numeroactualpreds1 = 0
    
    # Recorre todas las predumnas en preds. Esta sera pred 1
    for pred1 in preds:
        
        # Los siguientes preds seran pred 2 de tal forma que se pueda crear (pred 1, pred 2, correlación)
        numeroactualpreds2 = numeroactualpreds1 + 1
        while numeroactualpreds2 < longitudpreds:
            pred2 = preds[numeroactualpreds2]
            # Se calcula la correlación entre pred 1 y pred 2
            correlacion = dataframe.corr(pred1[0], pred2[0])
            # Se crea la lista nombre pred 1 - nombre pred 2 - correlación
            nombresCorrelacion = [pred1[0], pred2[0], correlacion]
            todasNombresCorrelacion.append(nombresCorrelacion)
            numeroactualpreds2 += 1
            
        numeroactualpreds1 += 1
    return todasNombresCorrelacion

def get_no_correlated(listaPred1Pred2Correlacion, preds=PRED_NUMERICOS, 
                                      predsTarget=PRED_NUMERICOS[-PREDICTORES_OBJETIVOS:], 
                                      umbral=UMBRAL_CORRELACION):
        
    if umbral < 0 or umbral > 1:
        umbral = UMBRAL_CORRELACION

    cabeceras = obtenerNombreCabeceras(preds)
    nombrepredsTarget = obtenerNombreCabeceras(predsTarget)
    
    for Pred1Pred2Correlacion in listaPred1Pred2Correlacion:
        if Pred1Pred2Correlacion[0] not in nombrepredsTarget and Pred1Pred2Correlacion[1] not in nombrepredsTarget:
            if (-umbral >= Pred1Pred2Correlacion[2] or umbral <= Pred1Pred2Correlacion[2]) and Pred1Pred2Correlacion[0] in cabeceras:
                cabeceras.remove(Pred1Pred2Correlacion[0])
    return cabeceras

In [16]:
Pred1Pred2Correlaciones = get_correlations(DF)
print('Correlación entre predictor 1 - predictor 2 - correlación: {0}'.format(Pred1Pred2Correlaciones))

In [17]:
predictoresNoCorrelados = get_no_correlated(Pred1Pred2Correlaciones)
print('Total predictores cuya correlación no es superior al umbral: {0}. Estos predictores son: {1}'.format(len(predictoresNoCorrelados), predictoresNoCorrelados))

predictoresEliminados = list()
for col in get_cabeceras():
    if col not in predictoresNoCorrelados:
        predictoresEliminados.append(col)
print('Total predictores eliminados: {0}. Predictores eliminados: {1}'.format(len(predictoresEliminados), predictoresEliminados))

In [18]:
DFReduced = DF.select(predictoresNoCorrelados)
DFReduced.show(5)

___
## Adecuación de los Dataframes para los modelos

Se tiene en cuenta lo siguiente:
* Añadir las variables independientes en un vector a caché.
* Dividir el conjunto de entrenamiento y el de test , cuya  relación será del 70/30

In [20]:

cabecerasVectorAssembler = cabecerasParaSelect[:]
for cabeceraTarget in get_cabeceras()[-PREDICTORES_OBJETIVOS:]:
    cabecerasVectorAssembler.remove(cabeceraTarget)

# Se crea el VectorAssembler poniendo el resultado en COL_FEATURES[0] columna
vectorAssembler = VectorAssembler(
    inputCols=cabecerasVectorAssembler, 
    outputCol=PRED_FEATURES[0])

DFAssembled = vectorAssembler.transform(DFReduced)
DFAssembled.show(3)

DFAssembled.cache()

In [21]:
# Se divide el dataframe en entrenamiento y test
splitter = DFAssembled.randomSplit([0.7, 0.3], 1234)
DFAssembledTrain = splitter[0]
DFAssembledTest = splitter[1]

___
## Modelo de predicción: Regresión Lineal

Se contruirá el modelo de predicción con una regresión lineal evaluada con **cross validation**, con los siguientes valores:

* regParam: 0.0, 0.01, 0.05, 0.5
* maxIter: 5, 10
* numFolds: 5

In [23]:
# Se crea el evaluador para este modelo siendo la columna objetivo COL_G3[0]
evaluatorRegression = RegressionEvaluator(labelCol=PRED_G3[0])

# Se crea la regresión lineal con solve=normal 
linearRegression = LinearRegression(solver='normal', labelCol=PRED_G3[0], featuresCol=PRED_FEATURES[0])
# Crossvalidation: los parámetros escogidos son:
#     * regParam: 0.0, 0.01, 0.05, 0.5
#     * maxIter: 5, 10
#     * numFolds: 5
gridLinearRegression = ParamGridBuilder(). \
                        addGrid(linearRegression.regParam, [0.0, 0.01, 0.05, 0.5]). \
                        addGrid(linearRegression.maxIter, [5, 10]).build()
crossValidatorLinearRegression = CrossValidator(estimator=linearRegression, \
                                                estimatorParamMaps=gridLinearRegression, \
                                                evaluator=evaluatorRegression, \
                                                numFolds=5)
# Se obtiene el modelo con los datos de entrada
crossValidatorLinearRegressionModel = crossValidatorLinearRegression.fit(DFAssembledTrain)

# Se muestra el intercept y los coeficientes del mejor modelo
print('Intercept del mejor modelo: {0}'.format(crossValidatorLinearRegressionModel.bestModel.intercept))
print('Coeficientes del mejor modelo: {0}'.format(crossValidatorLinearRegressionModel.bestModel.coefficients))

Se muestra el intercept y los coeficientes del mejor de ellos. 

Se observa el gran peso que tiene G2 en la predicción de G3.

A continuación se obtiene la predicción para el conjunto de entrenamiento y el de test con el mejor modelo para posteriormente mostrar el **RMSE**:

In [25]:
# Se obtiene la predicción sobre el conjunto de test y entrenamiento
DFAssembledTrainPrediction = crossValidatorLinearRegressionModel.bestModel.transform(DFAssembledTrain)
DFAssembledTestPrediction = crossValidatorLinearRegressionModel.bestModel.transform(DFAssembledTest)

# Se obtiene el RMSE sobre los dos conjuntos
rmseLinearRegressionTrain = evaluatorRegression.evaluate(DFAssembledTrainPrediction, {evaluatorRegression.metricName: 'rmse'})
rmseLinearRegressionTest = evaluatorRegression.evaluate(DFAssembledTestPrediction, {evaluatorRegression.metricName: 'rmse'})
# Se imprime por pantalla
print('RMSE en training: {0}'.format(rmseLinearRegressionTrain))
print('RMSE en test: {0}'.format(rmseLinearRegressionTest))

Como se puede observar, el error en el conjunto de entrenamiento es ligeramente superior al de test. No obstante ninguno supera el valor 2.

A continuación se muestran en un gráfico la nota real frente a la predicha tanto para el conjunto de test como para el de entrenamiento. Como se podrá observar, la gran mayoría se están alrededor de la línea azul, que sería la perfección:

In [27]:
# Se crea la lista con la nota real y la predicha
xTrain, yTrain = list(), list()
for trainPrediction in DFAssembledTrainPrediction.collect():
    xTrain.append(trainPrediction[PRED_G3[0]])
    yTrain.append(trainPrediction[PRED_PREDICTION[0]])
xTest, yTest = list(), list()
for testPrediction in DFAssembledTestPrediction.collect():
    xTest.append(testPrediction[PRED_G3[0]])
    yTest.append(testPrediction[PRED_PREDICTION[0]])

# Se crea el gráfico y se muestra
plt.clf()
plt.xlim(0, 20)
plt.ylim(0, 20)
plt.xlabel('Nota real')
plt.ylabel('Nota segun el modelo')
plt.title('Nota real vs prediccion Regresion lineal conjunto test')

plt.plot([0, 20], [0, 20], 'b')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(xTrain, yTrain, 'go', label='Train')
plt.plot(xTest, yTest, 'ro', label='Test')
plt.legend(loc='lower right')

plt.show()
display()


En los que se puede ver una gran diferencia entre lo predicho y lo real es cuando esta última es 0.

___
## Modelo de clasificación: Árboles de decisión 

Se contruirá el modelo de clasificación con un árbol de decisión evaluado con **cross validation**, con los siguientes valores:

* maxDepth: 3, 6, 10
* maxBins: 20, 40, 80
* numFolds: 5

Una vez conseguido el modelo, se muestra el número de nodos, la profundidad y el árbol en sí del mejor de ellos:

In [30]:
# Se crea el evaluador para este modelo siendo la columna objetivo COL_PASS[0]
evaluatorBinaryClassification = BinaryClassificationEvaluator(labelCol=PRED_PASS[0])

# Se crea el árbol de decisión
decisionTreeClassifier = DecisionTreeClassifier(labelCol=PRED_PASS[0], featuresCol=PRED_FEATURES[0])
# Crossvalidation: los parámetros escogidos son:
#     * maxDepth: 3, 6, 10
#     * maxBins: 20, 40, 80
#     * numFolds: 5
gridTreeClassifier = ParamGridBuilder(). \
                        addGrid(decisionTreeClassifier.maxDepth, [3, 6, 10]). \
                        addGrid(decisionTreeClassifier.maxBins, [20, 40, 80]).build()
crossValidatorDecisionTree = CrossValidator(estimator=decisionTreeClassifier, \
                                                estimatorParamMaps=gridTreeClassifier, \
                                                evaluator=evaluatorBinaryClassification, \
                                                numFolds=5)
# Se obtiene el modelo con los datos de entrada
crossValidatorDecisionTreeModel = crossValidatorDecisionTree.fit(DFAssembledTrain)

# Se muestra el número de nodos, profundidad y el árbol para el mejor modelo
print('Número del nodos del mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.numNodes))
print('Profundidad del mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.depth))
print('Árbol mejor modelo: {0}'.format(crossValidatorDecisionTreeModel.bestModel.toDebugString))

Se puede ver que el campo más significativo en este árbol es feature 13 que corresponde a G2. Era de esperar que tuviese mucha importancia porque está muy correlado con la variable dependiente.

A continuación se obtiene la predicción para el conjunto de entrenamiento y el de test con el mejor modelo para posteriormente mostrar el **área bajo la curva ROC**:

In [32]:
# Se obtiene la predicción sobre el conjunto de test y entrenamiento
DFAssembledTrainClasification = crossValidatorDecisionTreeModel.bestModel.transform(DFAssembledTrain)
DFAssembledTestClasification = crossValidatorDecisionTreeModel.bestModel.transform(DFAssembledTest)

# Se obtiene el RMSE sobre los dos conjuntos
arearocDecisionTreeTrain= evaluatorBinaryClassification.evaluate(\
                                                                          DFAssembledTrainClasification, \
                                                                          {evaluatorBinaryClassification.metricName: 'areaUnderROC'})
arearocDecisionTreeTest = evaluatorBinaryClassification.evaluate(\
                                                                 DFAssembledTestClasification, \
                                                                 {evaluatorBinaryClassification.metricName: 'areaUnderROC'})
# Se imprime por pantalla
print('Área bajo la curva ROC en training: {0}'.format(arearocDecisionTreeTrain))
print('Área bajo la curva ROC en test: {0}'.format(arearocDecisionTreeTest))

Como se puede observar está muy cerca del ideal. Y por último se dibuja la curva ROC. El área bajo la curva no es la misma que la obtenida a través del modelo. Esto puede deberse a que no se está teniendo en cuenta la probabilidad que estima el modelo:

In [34]:

# Se crea la lista con la nota real y la predicha
xEntrenamiento, yEntrenamiento = list(), list()
for entrenamientoPrediccion in DFAssembledTrainClasification.collect():
    xEntrenamiento.append(entrenamientoPrediccion[PRED_PASS[0]])
    yEntrenamiento.append(entrenamientoPrediccion[PRED_PREDICTION[0]])
xTest, yTest = list(), list()
for testPrediccion in DFAssembledTestClasification.collect():
    xTest.append(testPrediccion[PRED_PASS[0]])
    yTest.append(testPrediccion[PRED_PREDICTION[0]])

# Se obtienen los falsos positivos y verdaderos positivos y el valor bajo la curva para el test y el entrenamiento
falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento, thresholdsEntrenamiento = roc_curve(xEntrenamiento, yEntrenamiento)
rocAucEntrenamiento = auc(falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento)
falsePositiveRateTest, truePositiveRateTest, thresholdsTest = roc_curve(xTest, yTest)
rocAucTest = auc(falsePositiveRateTest, truePositiveRateTest)

# Se crea el gráfico y se muestra
plt.clf()
plt.title('Curva ROC con Arbol de decision')
# Se pasan los datos de entrenamiento y test al gráfico
plt.plot(falsePositiveRateEntrenamiento, truePositiveRateEntrenamiento, 'g', label='AUC Entrenamiento = %0.2f'% rocAucEntrenamiento)
plt.plot(falsePositiveRateTest, truePositiveRateTest, 'r', label='AUC Test = %0.2f'% rocAucTest)
plt.legend(loc='lower right')
plt.plot([0,1], [0,1], 'r--')
plt.xlim(0, 1)
plt.ylim(0, 1)
plt.ylabel('True Positive Ratio')
plt.xlabel('False Positive Ratio')
plt.show()
display()