# <font color='blue'> Proyecto Open Data II

Carlos Martín Hernández

## <font color='blue'> ÍNDICE </font>
>1. [Inicialización de PySpark](#imports1)
>2. [Importación de datos](#imports2)
>3. [Modelo Regresión Logística](#modelo1)
>4. [Modelo Random Forest](#modelo2)
>5. [Mejoras al Modelo](#mejoras)

### <a name='imports1'><font color='blue'> 1. Inicialización de PySpark: </font></a>

Se añaden los imports necesarios para inicializar PySpark y para la realización del proyecto.

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark

In [3]:
import os
import pyspark.sql.types as typ
import pandas as pd
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql import SQLContext

In [4]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [5]:
sqlContext = SQLContext(sc)



### <a name='imports2'><font color='blue'> 2. Importación de datos: </font></a>

Se crea el "schema", que contiene el nuevo nombre de cada columna del dataset.

In [6]:
Labels = [
    ('TÍTULOS_CONSEGUIDOS', typ.IntegerType()),
    ('PRUEBA', typ.StringType()),
    ('EDAD_PARTICIPANTE', typ.IntegerType()),
    ('VALORACIÓN_PREVIA', typ.IntegerType()),
    ('PENALIZACIONES_FASE_1', typ.IntegerType()),
    ('PENALIZACIONES_FASE_2', typ.IntegerType()),
    ('PENALIZACIONES_FASE_3', typ.IntegerType()),
    ('PENALIZACIONES_FASE_4', typ.IntegerType()),
    ('PUNTUACIÓN', typ.IntegerType()),
    ('TIEMPO_INTENTO_1', typ.IntegerType()),
    ('TIEMPO_INTENTO_2', typ.IntegerType()),
    ('DIFERENCIA_T1_T2', typ.IntegerType()),
    ('BONUS_FASE_1', typ.IntegerType()),
    ('BONUS_FASE_2', typ.IntegerType()),
    ('BONUS_FASE_3', typ.IntegerType()),
    ('BONUS_FASE_4', typ.IntegerType()),
    ('ABANDONOS', typ.IntegerType())
]

schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in Labels])

Se importan los datos y se les añade el schema creado previamente.

In [7]:
data = pd.read_csv('competition_data.csv') 
competition = sqlContext.createDataFrame(data,schema)

Comprobación del tipo de dato de cada columna del dataset.

In [8]:
competition.dtypes

[('TÍTULOS_CONSEGUIDOS', 'int'),
 ('PRUEBA', 'string'),
 ('EDAD_PARTICIPANTE', 'int'),
 ('VALORACIÓN_PREVIA', 'int'),
 ('PENALIZACIONES_FASE_1', 'int'),
 ('PENALIZACIONES_FASE_2', 'int'),
 ('PENALIZACIONES_FASE_3', 'int'),
 ('PENALIZACIONES_FASE_4', 'int'),
 ('PUNTUACIÓN', 'int'),
 ('TIEMPO_INTENTO_1', 'int'),
 ('TIEMPO_INTENTO_2', 'int'),
 ('DIFERENCIA_T1_T2', 'int'),
 ('BONUS_FASE_1', 'int'),
 ('BONUS_FASE_2', 'int'),
 ('BONUS_FASE_3', 'int'),
 ('BONUS_FASE_4', 'int'),
 ('ABANDONOS', 'int')]

Comprobación de la longitud del dataset.

In [9]:
print(data.shape)

(24853, 17)


### <a name='modelo1'><font color='blue'> 3. Modelo de Regresión Logística: </font></a>

Se añaden los imports necearios para la realización de este apartado.

In [10]:
import pyspark.ml.feature as ft
import pyspark.ml.classification as cl
from pyspark.sql.types import DoubleType
from pyspark.ml import Pipeline
import pyspark.ml.evaluation as ev

Se transforma a tipo numérico la variable "PRUEBA", ya que es necesario para crear el Transformer.

In [11]:
competition = competition.withColumn('PRUEBA_ID', competition['PRUEBA'].cast(typ.DoubleType()))

A continuación se crea el encoder, para transformar la variable "PRUEBA_ID" a variable de vectores binarios.

In [12]:
encoder = ft.OneHotEncoder(inputCol='PRUEBA_ID', outputCol='PRUEBA_VEC')

Se crea el "featuresCreator", utilizando el método "VectorAssembler".

In [13]:
featuresCreator = ft.VectorAssembler(inputCols=[col[0] for col in Labels[2:]] + [encoder.getOutputCol()],
                   outputCol='features')

Una vez realizados estos pasos previos, comienza la creación del modelo. En este caso, es un modelo de tipo Clasificación y una Regresión Logística.

In [14]:
logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01, 
    labelCol='TÍTULOS_CONSEGUIDOS')

Se transforma a tipo "double" la variable "TÍTULOS CONSEGUIDOS", ya que es necesario para la realización de este modelo.

In [15]:
competition = competition.withColumn("TÍTULOS_CONSEGUIDOS", competition["TÍTULOS_CONSEGUIDOS"].cast(DoubleType()))

Se crea el "Pipeline" con el "encoder", el "featuresCreator" y el modelo elegido, el logístico.

In [16]:
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

División del dataset en conjunto de test y entrenamiento, en este caso: 80% entrenamiento, 20% test.

In [17]:
competition_train, competition_test = competition.randomSplit([0.8, 0.2], seed=666)

Se entrena el modelo de regresión.

In [18]:
model = pipeline.fit(competition_train)

In [19]:
model_test = model.transform(competition_test)

In [20]:
model_test.take(1)

[Row(TÍTULOS_CONSEGUIDOS=0.0, PRUEBA='1', EDAD_PARTICIPANTE=15, VALORACIÓN_PREVIA=89, PENALIZACIONES_FASE_1=0, PENALIZACIONES_FASE_2=0, PENALIZACIONES_FASE_3=0, PENALIZACIONES_FASE_4=0, PUNTUACIÓN=63, TIEMPO_INTENTO_1=130, TIEMPO_INTENTO_2=147, DIFERENCIA_T1_T2=17, BONUS_FASE_1=0, BONUS_FASE_2=0, BONUS_FASE_3=0, BONUS_FASE_4=0, ABANDONOS=0, PRUEBA_ID=1.0, PRUEBA_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 15.0, 1: 89.0, 6: 63.0, 7: 130.0, 8: 147.0, 9: 17.0, 16: 1.0}), rawPrediction=DenseVector([-0.6889, 0.6889]), probability=DenseVector([0.3343, 0.6657]), prediction=1.0)]

Por último, se evalúa el modelo mediante el cálculo del "areaUnderROC" y del "areaUnderPR"

In [21]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='TÍTULOS_CONSEGUIDOS')

print(evaluator.evaluate(model_test, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(model_test, {evaluator.metricName: 'areaUnderPR'}))

0.7356634133277236
0.8909169191530993


### <a name='modelo2'><font color='blue'> 4. Modelo de Random Forest: </font></a>

Se añaden los imports necearios para la realización de este apartado.

In [22]:
import pyspark.sql.functions as func
from pyspark.ml.feature import StringIndexer
import pyspark.ml.classification as cl
import pyspark.ml.tuning as tune
import pyspark.ml.evaluation as ev

Se crea el "indexer" para obtener el "TÍTULOS_CONSEGUIDOS_indexer"

In [23]:
indexer = StringIndexer(inputCol="TÍTULOS_CONSEGUIDOS", outputCol="TÍTULOS_CONSEGUIDOS_indexer").fit(competition)

División del dataset en conjunto de test y entrenamiento, en este caso: 70% entrenamiento, 30% test.

In [24]:
competition_train, competition_test = competition.randomSplit([0.7, 0.3], seed=666)

Se entrena el modelo de Random Forest.

In [25]:
classifier = cl.RandomForestClassifier(
    
    numTrees=20, 
    maxDepth=25, 
   
    labelCol='TÍTULOS_CONSEGUIDOS_indexer')

grid = tune.ParamGridBuilder().addGrid(classifier.numTrees, [1, 3, 5]).addGrid(classifier.maxDepth, [5, 6, 7]).build()

pipeline = Pipeline(
    stages=[
        indexer,
        encoder,
        featuresCreator, 
        classifier])

model = pipeline.fit(competition_train)
test = model.transform(competition_test)

In [26]:
test.take(1)

[Row(TÍTULOS_CONSEGUIDOS=0.0, PRUEBA='1', EDAD_PARTICIPANTE=14, VALORACIÓN_PREVIA=89, PENALIZACIONES_FASE_1=0, PENALIZACIONES_FASE_2=0, PENALIZACIONES_FASE_3=0, PENALIZACIONES_FASE_4=0, PUNTUACIÓN=65, TIEMPO_INTENTO_1=170, TIEMPO_INTENTO_2=193, DIFERENCIA_T1_T2=23, BONUS_FASE_1=0, BONUS_FASE_2=0, BONUS_FASE_3=0, BONUS_FASE_4=0, ABANDONOS=0, PRUEBA_ID=1.0, TÍTULOS_CONSEGUIDOS_indexer=1.0, PRUEBA_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 14.0, 1: 89.0, 6: 65.0, 7: 170.0, 8: 193.0, 9: 23.0, 16: 1.0}), rawPrediction=DenseVector([15.9029, 4.0971]), probability=DenseVector([0.7951, 0.2049]), prediction=0.0)]

Por último, se evalúa el modelo mediante el cálculo del "areaUnderROC" y del "areaUnderPR".

In [27]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='TÍTULOS_CONSEGUIDOS_indexer')

print(evaluator.evaluate(test, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test, {evaluator.metricName: 'areaUnderPR'}))

0.7733841018038954
0.514262310299172


### <a name='mejoras'><font color='blue'> 5. Mejoras al Modelo: </font></a>

Se añaden los imports necearios para la realización de este apartado.

In [28]:
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.param import Param, Params
import numpy as np
import pyspark.ml.feature as ft
from pyspark.ml.feature import ChiSqSelector
from pyspark.ml.feature import PCA
from pyspark.sql import Row

Se construye el "Grid".

In [29]:
logistic = cl.LogisticRegression(labelCol='TÍTULOS_CONSEGUIDOS')
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter, [2, 10, 50]).addGrid(logistic.regParam, [0.01, 0.05, 0.3]).build()

De nuevo aparece el "BinayClassificationEvaluator", para comparar los modelos.

In [30]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='TÍTULOS_CONSEGUIDOS')

Mediante el "CrossValidator", se crea la lógica que realizará la validación.

In [31]:
cv = tune.CrossValidator(estimator=logistic, estimatorParamMaps=grid, evaluator=evaluator)

Se crea el "Pipeline".

In [32]:
pipeline = Pipeline(stages=[encoder,featuresCreator])
data_transformer = pipeline.fit(competition_train)

A continuación, se crea el "cvModel", que devolverá el mejor modelo estimado.

In [33]:
cvModel = cv.fit(data_transformer.transform(competition_train))

Se comprueba si da valores mejores que el modelo previo.

In [34]:
data_train = data_transformer.transform(competition_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

0.7356634133277236
0.8909169191530993

0.7326975308766814
0.8929778735202063


0.8909169191530993

Para obtener cuales han sido los mejores parámetros:

In [35]:
results = [([{key.name: paramValue} for key, paramValue 
            in zip(params.keys(), params.values())], metric) 
    for params, metric 
    in zip(cvModel.getEstimatorParamMaps(), cvModel.avgMetrics)]

sorted(results, key = lambda el: el[1], reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.7420381791678892)

In [36]:
data_transformer.stages

[OneHotEncoderModel: uid=OneHotEncoder_f384ea12d213, dropLast=true, handleInvalid=error,
 VectorAssembler_754097b2c92e]

In [37]:
data_transformer.stages[-1]

VectorAssembler_754097b2c92e

In [38]:
[stage.coefficients for stage in data_transformer.stages if hasattr(stage, "coefficients")]

[]

In [39]:
data_transformer.explainParams()

''

Mejores Parámetros:

In [40]:
cvModel.bestModel.explainParams()

"aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)\nelasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)\nfamily: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)\nfeaturesCol: features column name. (default: features)\nfitIntercept: whether to fit an intercept term. (default: True)\nlabelCol: label column name. (default: label, current: TÍTULOS_CONSEGUIDOS)\nlowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)\nlowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. 

In [41]:
print ('Mejor Parámetro (MaxIter): ', cvModel.bestModel._java_obj.getMaxIter())

Mejor Parámetro (MaxIter):  50


In [42]:
print ('Mejor Parámetro (RegParam): ', cvModel.bestModel._java_obj.getRegParam())

Mejor Parámetro (RegParam):  0.01


In [43]:
l = cvModel.params

In [44]:
cv.extractParamMap()

 Param(parent='CrossValidator_19cf7bdfaefc', name='parallelism', doc='the number of threads to use when running parallel algorithms (>= 1).'): 1,
 Param(parent='CrossValidator_19cf7bdfaefc', name='seed', doc='random seed.'): 698957015427823036,
 Param(parent='CrossValidator_19cf7bdfaefc', name='numFolds', doc='number of folds for cross validation'): 3,
 Param(parent='CrossValidator_19cf7bdfaefc', name='foldCol', doc="Param for the column name of user specified fold number. Once this is specified, :py:class:`CrossValidator` won't do random k-fold split. Note that this column should be integer type with range [0, numFolds) and Spark will throw exception on out-of-range fold numbers."): '',
 Param(parent='CrossValidator_19cf7bdfaefc', name='estimator', doc='estimator to be cross-validated'): LogisticRegression_205d45399134,
 Param(parent='CrossValidator_19cf7bdfaefc', name='estimatorParamMaps', doc='estimator param maps'): [{Param(parent='LogisticRegression_205d45399134', name='maxIter', 

In [45]:
comp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])
competition_train = comp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])

In [46]:
lr = LogisticRegression(maxIter=50, regParam=0.01)
print (lr.explainParams())

model2 = lr.fit(competition_train)
print (model2.extractParamMap())

aggregationDepth: suggested depth for treeAggregate (>= 2). (default: 2)
elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty. (default: 0.0)
family: The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial (default: auto)
featuresCol: features column name. (default: features)
fitIntercept: whether to fit an intercept term. (default: True)
labelCol: label column name. (default: label)
lowerBoundsOnCoefficients: The lower bounds on coefficients if fitting under bound constrained optimization. The bound matrix must be compatible with the shape (1, number of features) for binomial regression, or (number of classes, number of features) for multinomial regression. (undefined)
lowerBoundsOnIntercepts: The lower bounds on intercepts if fitting under bound constrained optimization. The bounds vector size must beequal wi

In [47]:
m = lr.copy()

In [48]:
m.getParam('regParam')

Param(parent='LogisticRegression_6a1d95f34035', name='regParam', doc='regularization parameter (>= 0).')

In [49]:
m.getMaxIter()

50

In [50]:
m.extractParamMap()

{Param(parent='LogisticRegression_6a1d95f34035', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_6a1d95f34035', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_6a1d95f34035', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_6a1d95f34035', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_6a1d95f34035', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_6a1d95f34035', name='labelCol', doc='label column name.'): 'label',
 Param(parent='LogisticRegression_6a1d95f34035', name='maxBlockSizeInMB', doc='maximum memory in MB for s

In [51]:
cv.explainParams()



Mejor Pipeline:

In [52]:
bestPipeline = cvModel.bestModel

In [53]:
bestPipeline

LogisticRegressionModel: uid=LogisticRegression_205d45399134, numClasses=2, numFeatures=24

Mejores Parámetros:

In [54]:
bestParams = bestPipeline.extractParamMap()

In [55]:
bestParams

{Param(parent='LogisticRegression_205d45399134', name='aggregationDepth', doc='suggested depth for treeAggregate (>= 2).'): 2,
 Param(parent='LogisticRegression_205d45399134', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0,
 Param(parent='LogisticRegression_205d45399134', name='family', doc='The name of family which is a description of the label distribution to be used in the model. Supported options: auto, binomial, multinomial'): 'auto',
 Param(parent='LogisticRegression_205d45399134', name='featuresCol', doc='features column name.'): 'features',
 Param(parent='LogisticRegression_205d45399134', name='fitIntercept', doc='whether to fit an intercept term.'): True,
 Param(parent='LogisticRegression_205d45399134', name='labelCol', doc='label column name.'): 'TÍTULOS_CONSEGUIDOS',
 Param(parent='LogisticRegression_205d45399134', name='maxBlockSizeInMB', doc='maximum memo

PCA:

In [60]:
pca = PCA(k=10, inputCol="features", outputCol="pca_features")
model = pca.fit(competition_train)
model.transform(competition_train).collect()[0].pca_features

DenseVector([-3.4903, -0.4039, 0.3183, 0.0834, 0.0797, -0.0246, -0.0203, -0.0181, 0.0004, 0.0])

In [61]:
selector = ChiSqSelector(numTopFeatures=1, featuresCol="features", outputCol="selectedFeatures", labelCol="label")

result = selector.fit(competition_train).transform(competition_train)

print("ChiSqSelector output with top %d features selected" % selector.getNumTopFeatures())

result.show()

ChiSqSelector output with top 1 features selected
+--------------------+-----+----------------+
|            features|label|selectedFeatures|
+--------------------+-----+----------------+
|[0.0,0.232,-0.017...|    0|           [0.0]|
|[0.0,0.111,-0.003...|    0|           [0.0]|
|[0.0,-0.391,-0.00...|    0|           [0.0]|
|[0.0,0.098,-0.012...|    0|           [0.0]|
|[0.0,0.026,-0.004...|    0|           [0.0]|
+--------------------+-----+----------------+

