# (Not so) Big Data
---
En el siguiente enlace encontrarás un fichero de datos comprimido. [here](https://drive.google.com/open?id=1Kr8k8tmN2ziskPwLW_A8lQ_M2-5vHKsa)

Descargalo y descomprimelo en tu local. Una vez hecho esto, realiza los siguientes ejercicios

**Instalación de Spark para Python**

In [None]:
!pip install findspark

** 1. Carga una sesión de spark local. Comprueba que el UI de Spark está activo en [localhost:4040](localhost:4040)**

In [1]:
### Con SparkSession
from pyspark.sql import SparkSession

# crear sesión

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("local[*]") \
        .getOrCreate()

In [3]:
spark

Como vemos la sesión de Spark se ha creado correctamente.

**2. Crea un dataframe de Spark y carga el fichero. **

Nota: esta separado por tabulador

Leemos los datos, teniendo en cuenta que no tienen cabecera y que estan separados por tabulaciones

In [2]:
df_spark = spark.read.option("inferSchema", "true").option("header", "false").option("delimiter", "\t").csv('data/adds/adds.txt')

** 3. Muestra la primera fila. ¿Cuántas variables numéricas hay?**

Mostramos la primera fila del conjunto de datos

In [5]:
# Mostramos la primera fila del data frame
#Opción 1:
df_spark.first()
#Opción 2:
df_spark.head(1)
#Opción 3:
df_spark.take(1)

[Row(_c0=0, _c1=1, _c2=1, _c3=5, _c4=0, _c5=1382, _c6=4, _c7=15, _c8=2, _c9=181, _c10=1, _c11=2, _c12=None, _c13=2, _c14='68fd1e64', _c15='80e26c9b', _c16='fb936136', _c17='7b4723c4', _c18='25c83c98', _c19='7e0ccccf', _c20='de7995b8', _c21='1f89b562', _c22='a73ee510', _c23='a8cd5504', _c24='b2cb9c98', _c25='37c9c164', _c26='2824a5f6', _c27='1adce6ef', _c28='8ba8b39a', _c29='891b62e7', _c30='e5ba7672', _c31='f54016b9', _c32='21ddcdc9', _c33='b1252a9d', _c34='07b5194c', _c35=None, _c36='3a171ecb', _c37='c5c50484', _c38='e8b83407', _c39='9727dd16')]

Mostramos la estructura del conjunto de datos y el tipo de cada una de las variables. Como vemos, coincide con la descripción proporcionada, donde las 14 primeras variables son de tipo numérico, incluyendo la variable respuesta catalogada como **_c0** de tipo binaria.

In [3]:
### Inspeccionamos el Dataframe
df_spark.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)
 |-- _c14: string (nullable = true)
 |-- _c15: string (nullable = true)
 |-- _c16: string (nullable = true)
 |-- _c17: string (nullable = true)
 |-- _c18: string (nullable = true)
 |-- _c19: string (nullable = true)
 |-- _c20: string (nullable = true)
 |-- _c21: string (nullable = true)
 |-- _c22: string (nullable = true)
 |-- _c23: string (nullable = true)
 |-- _c24: string (nullable = true)
 |-- _c25: string (nullable = true)
 |-- _c26: string (nullable = true)
 |-- _c27: string (

** 4. Crea un dataframe de spark sólo con las variables numéricas `dfs_num` **

En primer lugar, seleccionamos las variables de tipo numérico y las guardamos en la estructura pertinente.

In [17]:
columnList = [item[0] for item in df_spark.dtypes if item[1].startswith('int')]
columnList

['_c0',
 '_c1',
 '_c2',
 '_c3',
 '_c4',
 '_c5',
 '_c6',
 '_c7',
 '_c8',
 '_c9',
 '_c10',
 '_c11',
 '_c12',
 '_c13']

In [18]:
dfs_num = df_spark[columnList]

Finalmente, comprobamos que el cojunto obtenido es el requerido.

In [19]:
dfs_num.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: integer (nullable = true)
 |-- _c3: integer (nullable = true)
 |-- _c4: integer (nullable = true)
 |-- _c5: integer (nullable = true)
 |-- _c6: integer (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: integer (nullable = true)
 |-- _c9: integer (nullable = true)
 |-- _c10: integer (nullable = true)
 |-- _c11: integer (nullable = true)
 |-- _c12: integer (nullable = true)
 |-- _c13: integer (nullable = true)



** 5 ¿Cuántos valores nulos existen en cada columna? **

In [31]:
from pyspark.sql.functions import isnan, when, count, col, isnull

# Miramos primero valores NAN
dfs_num.select([count(when(isnan(c), c)).alias(c) for c in dfs_num.columns]).show()

+---+---+---+---+---+---+---+---+---+---+----+----+----+----+
|_c0|_c1|_c2|_c3|_c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+
|  0|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|   0|   0|   0|
+---+---+---+---+---+---+---+---+---+---+----+----+----+----+

+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+
|_c0|     _c1|_c2|    _c3|    _c4|    _c5|     _c6|    _c7|  _c8|    _c9|    _c10|   _c11|    _c12|   _c13|
+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+
|  0|20793556|  0|9839447|9937369|1183117|10252328|1982866|22773|1982866|20793556|1982866|35071652|9937369|
+---+--------+---+-------+-------+-------+--------+-------+-----+-------+--------+-------+--------+-------+



In [None]:
# Miramos después valores isNull
dfs_num.select([count(when(isnull(c), c)).alias(c) for c in dfs_num.columns]).show()

** 6. ¿Cuál es la media de cada una de las columnas? **

In [93]:
df_describe = dfs_num.describe().collect()

# Mostramos los valores de la media para cada columna
print(df_describe[1])

# Cogemos la fila que contiene los valores de la media
# Convertimos los valores a entero
mean_columns = list(map(float, list(df_describe[1][1:])))

Row(summary='mean', _c0='0.2562233837297609', _c1='3.5024133170754044', _c2='105.84841979766546', _c3='26.913041020611274', _c4='7.322680248873305', _c5='18538.991664871523', _c6='116.06185085211598', _c7='16.333130032135028', _c8='12.517042137556713', _c9='106.1098234380509', _c10='0.6175294977722137', _c11='2.7328343170173044', _c12='0.9910356287721244', _c13='8.217461161174054')


Como alternativa, podemos realizar la misma operación de forma más directa.

In [67]:
mean_columns = dfs_num.groupBy().mean().collect()
print(mean_columns)

** 7. Sustituye los valores de cada columna por la media **

In [None]:
for i in length(mean_columns)
    dfs_num.withColumn(dfs_num.columns[i], mean_columns[i])

In [98]:
dfs_num.withColumn("_c1", mean_columns[0])

AssertionError: col should be Column

<h1 style='color:green'>Bonus<span>

Spark permite entrenar modelos de ML sobre grandes volúmenes de datos en paralelo. Échale un vistazo a la documentación [here](https://spark.apache.org/docs/2.2.0/ml-classification-regression.html#binomial-logistic-regression) e intenta crear un modelo de clasificación usando sólo las variables numéricas anteriores.

Utiliza los últimos 10M de filas como set de testeo.

La variable objetivo esta en la primera columna.

** Crea un modelo de clasificación en Spark usando solo las variables numéricas. ¿Qué AUC ROC obtienes?**

In [92]:
from pyspark.ml.classification import LogisticRegression
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

# Load training data
training=dfs_num.rdd.map(lambda x:(Vectors.dense(x[1:-1]), x[0])).toDF(["features", "label"])
training.head(3)

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

# We can also use the multinomial family for binary classification
mlr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8, family="multinomial")

# Fit the model
mlrModel = mlr.fit(training)

# Print the coefficients and intercepts for logistic regression with multinomial family
print("Multinomial coefficients: " + str(mlrModel.coefficientMatrix))
print("Multinomial intercepts: " + str(mlrModel.interceptVector))

KeyboardInterrupt: 