# (Not so) Big Data
---
En el siguiente enlace encontrarás un fichero de datos comprimido. [here](https://drive.google.com/open?id=1Kr8k8tmN2ziskPwLW_A8lQ_M2-5vHKsa)

Descargalo y descomprimelo en tu local. Una vez hecho esto, realiza los siguientes ejercicios

** 1. Carga una sesión de spark local. Comprueba que el UI de Spark está activo en [localhost:4040](localhost:4040)**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, mean, round

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("local[*]") \
        .getOrCreate()

**2. Crea un dataframe de Spark y carga el fichero. **

*Nota: está separado por tabulador*

In [2]:
dfs_adds = spark.read \
            .option("inferSchema", "true") \
            .option("header", "false") \
            .option("delimiter", "\t") \
            .csv('/home/jovyan/MADM/opt/adds.txt')

<br>
Mostramos el número de filas totales que tiene el dataframe:

In [3]:
dfs_adds.count()

45840617

<br>
Y su esquema:

In [4]:
dfs_adds.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (

** 3. Muestra la primera fila. ¿Cuántas variables numéricas hay?**

In [5]:
dfs_adds.show(1)

+---+---+---+---+---+----+---+---+---+---+----+----+----+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----+--------+--------+--------+--------+
|_c0|_c1|_c2|_c3|_c4| _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|    _c14|    _c15|    _c16|    _c17|    _c18|    _c19|    _c20|    _c21|    _c22|    _c23|    _c24|    _c25|    _c26|    _c27|    _c28|    _c29|    _c30|    _c31|    _c32|    _c33|    _c34|_c35|    _c36|    _c37|    _c38|    _c39|
+---+---+---+---+---+----+---+---+---+---+----+----+----+----+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+--------+----+--------+--------+--------+--------+
|  0|  1|  1|  5|  0|1382|  4| 15|  2|181|   1|   2|null|   2|68fd1e64|80e26c9b|fb936136|7b4723c4|25c83c98|7e0ccccf|de799

** 4. Crea un dataframe de spark sólo con las variables numéricas `dfs_num` **

Comprobamos cuáles son los tipos de todas las variables del dataframe:

In [6]:
dfs_adds.dtypes

[('_c0', 'int'),
 ('_c1', 'int'),
 ('_c2', 'int'),
 ('_c3', 'int'),
 ('_c4', 'int'),
 ('_c5', 'int'),
 ('_c6', 'int'),
 ('_c7', 'int'),
 ('_c8', 'int'),
 ('_c9', 'int'),
 ('_c10', 'int'),
 ('_c11', 'int'),
 ('_c12', 'int'),
 ('_c13', 'int'),
 ('_c14', 'string'),
 ('_c15', 'string'),
 ('_c16', 'string'),
 ('_c17', 'string'),
 ('_c18', 'string'),
 ('_c19', 'string'),
 ('_c20', 'string'),
 ('_c21', 'string'),
 ('_c22', 'string'),
 ('_c23', 'string'),
 ('_c24', 'string'),
 ('_c25', 'string'),
 ('_c26', 'string'),
 ('_c27', 'string'),
 ('_c28', 'string'),
 ('_c29', 'string'),
 ('_c30', 'string'),
 ('_c31', 'string'),
 ('_c32', 'string'),
 ('_c33', 'string'),
 ('_c34', 'string'),
 ('_c35', 'string'),
 ('_c36', 'string'),
 ('_c37', 'string'),
 ('_c38', 'string'),
 ('_c39', 'string')]

Y, a continuación, cogemos únicamente aquellas que sean **int**:

In [7]:
num_column_list = [item[0] for item in dfs_adds.dtypes if item[1] == 'int']
num_column_list

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13']

y las metemos en un nuevo dataframe llamado **dfs_num**:

In [8]:
dfs_num =  dfs_adds.select(num_column_list)
dfs_num.show(1)

+---+---+---+---+---+----+---+---+---+---+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4| _c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|
+---+---+---+---+---+----+---+---+---+---+----+----+----+----+
|  0|  1|  1|  5|  0|1382|  4| 15|  2|181|   1|   2|null|   2|
+---+---+---+---+---+----+---+---+---+---+----+----+----+----+
only showing top 1 row



** 5 ¿Cuántos valores nulos existen en cada columna? **

Para no tener que calcular en número de nulos de cada columna manualmente, hacemos un bucle para recorrer todas las columnas y así crear una lista que contendrá una "*query*" por columna que cuente sus nulos:

In [9]:
null_counter_querys = [count(when(col(c).isNull(), c)).alias(c) for c in dfs_num.columns]
null_counter_querys

[Column<b'count(CASE WHEN (_c0 IS NULL) THEN _c0 END) AS `_c0`'>,
 Column<b'count(CASE WHEN (_c1 IS NULL) THEN _c1 END) AS `_c1`'>,
 Column<b'count(CASE WHEN (_c2 IS NULL) THEN _c2 END) AS `_c2`'>,
 Column<b'count(CASE WHEN (_c3 IS NULL) THEN _c3 END) AS `_c3`'>,
 Column<b'count(CASE WHEN (_c4 IS NULL) THEN _c4 END) AS `_c4`'>,
 Column<b'count(CASE WHEN (_c5 IS NULL) THEN _c5 END) AS `_c5`'>,
 Column<b'count(CASE WHEN (_c6 IS NULL) THEN _c6 END) AS `_c6`'>,
 Column<b'count(CASE WHEN (_c7 IS NULL) THEN _c7 END) AS `_c7`'>,
 Column<b'count(CASE WHEN (_c8 IS NULL) THEN _c8 END) AS `_c8`'>,
 Column<b'count(CASE WHEN (_c9 IS NULL) THEN _c9 END) AS `_c9`'>,
 Column<b'count(CASE WHEN (_c10 IS NULL) THEN _c10 END) AS `_c10`'>,
 Column<b'count(CASE WHEN (_c11 IS NULL) THEN _c11 END) AS `_c11`'>,
 Column<b'count(CASE WHEN (_c12 IS NULL) THEN _c12 END) AS `_c12`'>,
 Column<b'count(CASE WHEN (_c13 IS NULL) THEN _c13 END) AS `_c13`'>]

A partir de esto, se cuenta el número de nulos de cada columna:

In [10]:
dfs_num.select(null_counter_querys).show()

+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+
|_c0|     _c1|_c2|    _c3|    _c4|    _c5|     _c6|    _c7|  _c8|    _c9|    _c10|   _c11|    _c12|   _c13|
+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+
|  0|20793556|  0|9839447|9937369|1183117|10252328|1982866|22773|1982866|20793556|1982866|35071652|9937369|
+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+



** 6. ¿Cuál es la media de cada una de las columnas? **

Hacemos lo mismo que en el anterior ejercicio, pero esta vez para calcular la media:

In [11]:
mean_querys = [round(mean(c), 4).alias(c) for c in dfs_num.columns]
mean_querys

[Column<b'round(avg(_c0), 4) AS `_c0`'>,
 Column<b'round(avg(_c1), 4) AS `_c1`'>,
 Column<b'round(avg(_c2), 4) AS `_c2`'>,
 Column<b'round(avg(_c3), 4) AS `_c3`'>,
 Column<b'round(avg(_c4), 4) AS `_c4`'>,
 Column<b'round(avg(_c5), 4) AS `_c5`'>,
 Column<b'round(avg(_c6), 4) AS `_c6`'>,
 Column<b'round(avg(_c7), 4) AS `_c7`'>,
 Column<b'round(avg(_c8), 4) AS `_c8`'>,
 Column<b'round(avg(_c9), 4) AS `_c9`'>,
 Column<b'round(avg(_c10), 4) AS `_c10`'>,
 Column<b'round(avg(_c11), 4) AS `_c11`'>,
 Column<b'round(avg(_c12), 4) AS `_c12`'>,
 Column<b'round(avg(_c13), 4) AS `_c13`'>]

In [12]:
df_mean = dfs_num.select(mean_querys)
df_mean.show()

+------+------+--------+------+------+----------+--------+-------+------+--------+------+------+-----+------+
|   _c0|   _c1|     _c2|   _c3|   _c4|       _c5|     _c6|    _c7|   _c8|     _c9|  _c10|  _c11| _c12|  _c13|
+------+------+--------+------+------+----------+--------+-------+------+--------+------+------+-----+------+
|0.2562|3.5024|105.8484|26.913|7.3227|18538.9917|116.0619|16.3331|12.517|106.1098|0.6175|2.7328|0.991|8.2175|
+------+------+--------+------+------+----------+--------+-------+------+--------+------+------+-----+------+



** 7. Sustituye los valores de cada columna por la media **

Para sustituir los valores nulos de cada columna por la media, utilizaremos el dataframe **df_mean** del ejercicio anterior. A este dataframe le aplicaremos la función **collect()**, la cual nos devolverá sus valores en una estructura de datos de tipo *lista*. Esta operación es muy útil después de un filtro o cualquier otra operación que devuelva un subconjunto de datos suficientemente pequeño.

In [13]:
mean_list = df_mean.collect()
mean_list

[Row(_c0=0.2562, _c1=3.5024, _c2=105.8484, _c3=26.913, _c4=7.3227, _c5=18538.9917, _c6=116.0619, _c7=16.3331, _c8=12.517, _c9=106.1098, _c10=0.6175, _c11=2.7328, _c12=0.991, _c13=8.2175)]

A continuación, convertimos la lista anterior a una estructura de datos de tipo *diccionario* de Python en el que las claves son los nombres de las columnas del dataframe. De esta forma, al aplicar la función **fillna**, para sustituir los valores nulos, se relacionarán ambas estructuras para aplicar la media a su respectiva columna.

In [14]:
mean_dict = mean_list[0].asDict()
dfs_num_no_na = dfs_num.fillna(mean_dict)

<br>
Finalmente, comprobamos que ya no queden valores nulos en ninguna columna:

In [15]:
null_counter_querys_no_na = [count(when(col(c).isNull(), c)).alias(c) for c in dfs_num_no_na.columns]
dfs_num_no_na.select(null_counter_querys_no_na).show()

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+



y que la media de las columnas que tenían nulos haya cambiado:

In [16]:
mean_querys_no_na = [round(mean(c), 4).alias(c) for c in dfs_num_no_na.columns]
dfs_num_no_na.select(mean_querys_no_na).show()

+------+------+--------+-------+------+----------+-------+-------+-------+--------+------+------+------+------+
|   _c0|   _c1|     _c2|    _c3|   _c4|       _c5|    _c6|    _c7|    _c8|     _c9|  _c10|  _c11|  _c12|  _c13|
+------+------+--------+-------+------+----------+-------+-------+-------+--------+------+------+------+------+
|0.2562|3.2745|105.8484|26.7171|7.2527|18538.9661|116.048|16.3187|12.5168|106.1051|0.3374|2.7011|0.2328|8.1703|
+------+------+--------+-------+------+----------+-------+-------+-------+--------+------+------+------+------+



<h1 style='color:green'>Bonus<span>

Spark permite entrenar modelos de ML sobre grandes volúmenes de datos en paralelo. Échale un vistazo a la documentación [here](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#binomial-logistic-regression) e intenta crear un modelo de clasificación usando sólo las variables numéricas anteriores.

Utiliza los últimos 10M de filas como set de testeo.

La variable objetivo esta en la primera columna.

** Crea un modelo de clasificación en Spark usando solo las variables numéricas. ¿Qué AUC ROC obtienes?**

Primero hay que separar los predictores (*features*) y la variable a predecir (*label*) de la siguiente manera:

In [17]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["_c1", "_c2", "_c3", "_c4", "_c5", "_c6", "_c7", "_c8", "_c9", "_c10", "_c11", "_c12", "_c13"],
    outputCol="features")

output = assembler.transform(dfs_num_no_na)
df_ml = output.select("features", "_c0").withColumnRenamed("_c0", "label")

df_ml.show(truncate=False)

+----------------------------------------------------------------+-----+
|features                                                        |label|
+----------------------------------------------------------------+-----+
|[1.0,1.0,5.0,0.0,1382.0,4.0,15.0,2.0,181.0,1.0,2.0,0.0,2.0]     |0    |
|[2.0,0.0,44.0,1.0,102.0,8.0,2.0,2.0,4.0,1.0,1.0,0.0,4.0]        |0    |
|[2.0,0.0,1.0,14.0,767.0,89.0,4.0,2.0,245.0,1.0,3.0,3.0,45.0]    |0    |
|(13,[0,1,2,3,4,5,12],[3.0,893.0,26.0,7.0,4392.0,116.0,8.0])     |0    |
|(13,[0,1,2,4,6,9,10],[3.0,-1.0,26.0,2.0,3.0,1.0,1.0])           |0    |
|[3.0,-1.0,26.0,7.0,12824.0,116.0,0.0,0.0,6.0,0.0,0.0,0.0,8.0]   |0    |
|[3.0,1.0,2.0,7.0,3168.0,116.0,0.0,1.0,2.0,0.0,0.0,0.0,8.0]      |0    |
|(13,[0,1,2,6,9,10],[1.0,4.0,2.0,1.0,1.0,1.0])                   |1    |
|[3.0,44.0,4.0,8.0,19010.0,249.0,28.0,31.0,141.0,0.0,1.0,0.0,8.0]|0    |
|[3.0,35.0,26.0,1.0,33737.0,21.0,1.0,2.0,3.0,0.0,1.0,0.0,1.0]    |0    |
|[3.0,2.0,632.0,0.0,56770.0,116.0,0.0,5.0,65.0,0.0,

<br>
A continuación, separamos el conjunto de datos en datos de prueba y de entrenamiento. No hemos podido coger los últimos 10M de filas como conjunto de prueba porque no hemos encontrado un método eficiente en cuanto a tiempo y en cuanto a recursos de la máquina. Por esta razón, usamos la función **randomSplit**, que lo hace automáticamente y de una forma aleatoria.

In [18]:
training_data, testing_data = df_ml.randomSplit([0.8, 0.2])

<br>
Realizamos una regresión lineal sobre el conjunto de datos de entrenamiento:

In [20]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=10)

# Fit the model
log_reg = lr.fit(training_data)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(log_reg.coefficients))
print("Intercept: " + str(log_reg.intercept))

Coefficients: [0.00285541048927,0.000156361982228,0.000112500918501,0.0102146335971,-3.7697698062e-06,-0.00107398744422,-0.00184713706832,-0.0105201492498,9.86144925466e-05,0.428181704389,0.0531451879372,0.0281082807693,-0.0305578395883]
Intercept: -0.9628918853089147


In [21]:
predictions = log_reg.transform(testing_data)

predictions.show(5)

+--------------------+-----+--------------------+--------------------+----------+
|            features|label|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
|(13,[0,1,2,3,4,5,...|    0|[1.10461320147489...|[0.75112348229997...|       0.0|
|(13,[0,1,2,3,4,5,...|    0|[1.29816167736331...|[0.78552543378549...|       0.0|
|(13,[0,1,2,3,4,5,...|    0|[1.26221889061081...|[0.77940784094338...|       0.0|
|(13,[0,1,2,3,4,5,...|    0|[1.36990236512496...|[0.79736437872795...|       0.0|
|(13,[0,1,2,3,4,5,...|    1|[1.29290443248693...|[0.78463838878342...|       0.0|
+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows



<br>
Y, por último, calculamos el área bajo la curva:

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

print("Training: Area Under ROC: " + str(log_reg.summary.areaUnderROC))

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
print("Test: Area Under ROC: " + str(evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})))

Training: Area Under ROC: 0.6816807614157969
Test: Area Under ROC: 0.6819624856937434


<br>
### La bibliografía usada para realizar este ejercicio de bonus ha sido la siguiente:

* https://spark.apache.org/docs/2.2.0/ml-classification-regression.html

* https://mapr.com/blog/churn-prediction-pyspark-using-mllib-and-ml-packages/

* https://wesslen.github.io/twitter/predicting_twitter_profile_location_with_pyspark/

* https://spark.apache.org/docs/2.1.0/ml-features.html#vectorassembler

* https://docs.databricks.com/spark/latest/mllib/index.html