### PROYECTO OPEN DATA II: IMPLEMENTACIÓN DEL ALGORÍTMO DE REGRESIÓN LINEAL

Para la segunda parte de esta asignatura nos hemos sumergido en el mundo de programación con pyspark, utilizado en la dinámica de programación en distribuido. A través de pyspark hemos trabajado con un algoritmo de machine learning, asi pues pudiendo trabajar con las bases de uno de los fenómenos más populares en el mercado ahora mismo.

Hemos optado por la implementación de un algoritmo de regresión lineal, cuyo propósito es establecer un modelo para la relación entre características y una variable objetivo. Para el dataset con el que trabajamos nosotras, la primera consiste de una serie de calificaciones optenidas por cierto estudiante y la segunda será la probabilidad de entrada de este estudiante en un máster en concreto.

Nuestros parámetros son los siguientes:
1. GRE Scores ( de 0 a 340 )
2. TOEFL Scores ( de 0 a120 )
3. Valoración de la universidad (de 0 a 5 )
4. Declaración de propósito y carta de recomendación (de 0 a 5)
5. GPA Scores (de 0 a 10)
6. Experiencia en investigación (0 o 1)
7. Probabilidad de ser admitido (entre 0 y 1)

Aplicamos nuestro algoritmo, y lo utilizamos para calcular algunos valores que nos ayudan a medir la eficacia de este. A continuación manejamos una mejora del modelo en cuestión, empleando técnicas de hyper-tuning de los parámetros y grid search, y acabando con una extracción de características y PCA.

**Preparación del entorno de trabajo**

Montamos el entorno de programcación con pyspark, importandonos los módulos necesarios.
En este caso trabajamos con SparkContext, SparkSession y SQLContext.

In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
sc=SparkContext(master="local[3]")
print(sc)
from pyspark.sql.session import SparkSession
spark = SparkSession(sc)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

<SparkContext master=local[3] appName=pyspark-shell>


A continuación nos descargamos el dataset y realizamos la limpieza necesaria.

In [2]:
df = sqlContext.read.format("com.databricks.spark.csv").options(header='true',inferschema='true').load("Admission_Predict.csv")
display(df)

DataFrame[Serial No.: int, GRE Score: int, TOEFL Score: int, University Rating: int, SOP: double, LOR : double, CGPA: double, Research: int, Chance of Admit : double]

Solo hemos considerado necesario cambiar el nombre de una columna, ya que tenía un punto que en algunos métodos daba problemas.

In [3]:
df = df.withColumnRenamed("Serial No.", "Serial No")

Comprobamos el dataset.

In [4]:
df.show(5)

+---------+---------+-----------+-----------------+---+----+----+--------+----------------+
|Serial No|GRE Score|TOEFL Score|University Rating|SOP|LOR |CGPA|Research|Chance of Admit |
+---------+---------+-----------+-----------------+---+----+----+--------+----------------+
|        1|      337|        118|                4|4.5| 4.5|9.65|       1|            0.92|
|        2|      324|        107|                4|4.0| 4.5|8.87|       1|            0.76|
|        3|      316|        104|                3|3.0| 3.5| 8.0|       1|            0.72|
|        4|      322|        110|                3|3.5| 2.5|8.67|       1|             0.8|
|        5|      314|        103|                2|2.0| 3.0|8.21|       0|            0.65|
+---------+---------+-----------+-----------------+---+----+----+--------+----------------+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- Serial No: integer (nullable = true)
 |-- GRE Score: integer (nullable = true)
 |-- TOEFL Score: integer (nullable = true)
 |-- University Rating: integer (nullable = true)
 |-- SOP: double (nullable = true)
 |-- LOR : double (nullable = true)
 |-- CGPA: double (nullable = true)
 |-- Research: integer (nullable = true)
 |-- Chance of Admit : double (nullable = true)



**Alogritmo de Regresión Lineal**

En primer lugar, convertimos los datos a dense vector: creamos features y label, las etiquetas con las que suelen trabajar los algoritmos machine learning.

In [6]:
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

In [7]:
def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [8]:
transformed= transData(df)
transformed.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,337.0,118.0,...| 0.92|
|[2.0,324.0,107.0,...| 0.76|
|[3.0,316.0,104.0,...| 0.72|
|[4.0,322.0,110.0,...|  0.8|
|[5.0,314.0,103.0,...| 0.65|
+--------------------+-----+
only showing top 5 rows



In [9]:
from pyspark.ml.feature import VectorIndexer

featureIndexer = VectorIndexer(inputCol="features", \
                               outputCol="indexedFeatures").fit(transformed)
data = featureIndexer.transform(transformed)

In [10]:
data.show(5,True)

+--------------------+-----+--------------------+
|            features|label|     indexedFeatures|
+--------------------+-----+--------------------+
|[1.0,337.0,118.0,...| 0.92|[1.0,337.0,118.0,...|
|[2.0,324.0,107.0,...| 0.76|[2.0,324.0,107.0,...|
|[3.0,316.0,104.0,...| 0.72|[3.0,316.0,104.0,...|
|[4.0,322.0,110.0,...|  0.8|[4.0,322.0,110.0,...|
|[5.0,314.0,103.0,...| 0.65|[5.0,314.0,103.0,...|
+--------------------+-----+--------------------+
only showing top 5 rows



Separamos los datos en datos de entrenamiento (training) y de testeo (test) (60% para training y 40% para testing)

In [11]:
(trainingData, testData) = transformed.randomSplit([0.6, 0.4])

In [12]:
trainingData.show(5)
testData.show(5)

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[2.0,324.0,107.0,...| 0.76|
|[3.0,316.0,104.0,...| 0.72|
|[5.0,314.0,103.0,...| 0.65|
|[6.0,330.0,115.0,...|  0.9|
|[7.0,321.0,109.0,...| 0.75|
+--------------------+-----+
only showing top 5 rows

+--------------------+-----+
|            features|label|
+--------------------+-----+
|[1.0,337.0,118.0,...| 0.92|
|[4.0,322.0,110.0,...|  0.8|
|[8.0,308.0,101.0,...| 0.68|
|[15.0,311.0,104.0...| 0.61|
|[17.0,317.0,107.0...| 0.66|
+--------------------+-----+
only showing top 5 rows



Definimos algoritmo de Regresion Lineal. Tiene tres parámetros, a los que para empezar ponemos valores por defecto.

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=2, regParam=0.5, elasticNetParam=0.5)

Montamos arquitectura Pipeline (segmentación de instrucciones): Permite implementar el paralelismo a nivel de instrucción en un único procesador.

In [None]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[featureIndexer, lr])
model = pipeline.fit(trainingData)

Obtenemos un resumen de nuestro modelo. Sacamos los resultados de los siguientes:
* **Error cuadrático medio**: un estimador que mide el promedio de los errores al cuadrado, es decir, la diferencia entre el estimador y lo que se estima
* **RMSE**: mide las diferencias entre los valores predichos por un modelo o un estimador y los valores observados.
* **R cuadrado**: (coeficiente de determinación) para predecir futuros resultados/probar una hipótesis (como de bien se pueden predecir futuros resultados). Se puede interpretar con como de cerca están los datos a la linea de regresión

In [None]:
lrModel = model.stages[-1]
trainingSummary = lrModel.summary

In [None]:
def modelsummary(model):
    import numpy as np
    Summary=model.summary
    
    print ("##",'-----------------------------------------------------')
    print ("##","Error Cuadrático Medio: % .6f" \
           % Summary.meanSquaredError, ", RMSE: % .6f" \
           % Summary.rootMeanSquaredError )
    print ("##","R cuadrado: %f" % Summary.r2, ", \
    Total iteraciones: %i"% Summary.totalIterations)
    print ("##",'-----------------------------------------------------')

In [None]:
modelsummary(model.stages[-1])

Procedemos a realizar predicciones con el test data:

In [None]:
predictions = model.transform(testData)

In [None]:
predictions.select("features","label","prediction").show(5)

**Evaluación de resultados**

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(labelCol="label",
                                predictionCol="prediction",
                                metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Raiz del error cuadrático medio (RMSE) de los datos de prueba = %g" % rmse)

In [None]:
y_true = predictions.select("label").toPandas()
y_pred = predictions.select("prediction").toPandas()

In [None]:
import sklearn.metrics
r2_score = sklearn.metrics.r2_score(y_true, y_pred) 
print('valor r2_: {0}'.format(r2_score))

**Mejora del modelo: Hyper-tunning**

El hyper-tunning trata de encontrar la mejor opción de combinación de valores de los parámetros de entrada de nuestro modelo. De tal forma podemos entrenar nuestro modelo previo para encontrar su rendimiento más óptimo.

**Grid search** es un algoritmo que itera a través de la lista de valores de los parámetros y estima los modelos de manera independiente y escoje la mejor opción.

En primer lugar, especificamos a nuestro modelo la lista de parámetros sobre la que vamos a iterar.

In [None]:
import pyspark.ml.tuning as tune
import pyspark.ml.classification as cl
import pyspark.ml.evaluation as ev

linear = LinearRegression(labelCol='label',featuresCol = 'indexedFeatures')
grid = tune.ParamGridBuilder().addGrid(linear.maxIter, [2, 10, 30]).addGrid(linear.regParam, [0.01, 0.07, 0.5]).addGrid(linear.elasticNetParam, [0.5, 0.4, 0.8]).build()

# BinaryClassificationEvaluator lo utilizamos para comparar los modelos (a través de la comparación de su rendimiento)
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label')

In [None]:
cv = tune.CrossValidator(estimator=linear, estimatorParamMaps=grid, evaluator=evaluator)

In [None]:
pipeline2 = Pipeline(stages=[featureIndexer])
model2 = pipeline2.fit(trainingData)

cvModel nos devuelve el mejor modelo estimado.

In [None]:
cvModel = cv.fit(model2.transform(trainingData))

Sacamos el area bajo la curva ROC y PR

In [None]:
data_train = model2.transform(testData)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

**Evaluación de los mejores parámetros para nuestro modelo**

In [None]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, key=lambda el: el[1], reverse=True)[0]

In [None]:
print ('Best Param (MaxIter): ', cvModel.bestModel._java_obj.getMaxIter())

In [None]:
print ('Best Param (RegParam): ', cvModel.bestModel._java_obj.getRegParam())

In [None]:
print ('Best Param (ElasticNetParam): ', cvModel.bestModel._java_obj.getElasticNetParam())

Una vez obtenidos los mejores parámetros para nuestro modelo, lo entrenamos con estos parámetros para obtener la mejor predicción en base a nuestros datos de entrada.